I’ve been trying to parse the cassandra commit logs using the CommitLogReader.java but unfortunately at the moment I have only been able to see the changes for the system and system_schema keyspaces. The changes of the table in my keyspace (demo) do not seem to show up in the logs.
Here is what I have done to enable the cdc:
cdc_enabled: true
cdc_block_writes: true
cdc_on_repair_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
commitlog_directory: /var/lib/cassandra/commitlog
CREATE KEYSPACE IF NOT EXISTS demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create table if not exists demo.test_table( uuid UUID PRIMARY KEY, name text ) with cdc=true;
// Add few elements
insert into demo.test_table ("uuid", "name") values (f26381e4-6c99-4f82-b60c-b90773418a9d, 'test');
insert into demo.test_table ("uuid", "name") values (1e1b0e16-706e-421c-977a-81dd43162ef5, 'test2');
insert into demo.test_table ("uuid", "name") values (5d7cc223-a22f-491d-856e-1639a5b35b52, 'test3');
private fun readCommitLog(commitLogFile: java.io.File) {
println("Reading CDC log: " + commitLogFile.name)
val reader = CommitLogReader()
val cdcMutationHandler: CDCMutationHandler = CDCMutationHandler()
val file = File(commitLogFile.absolutePath)
reader.readCommitLogSegment(cdcMutationHandler, file, CommitLogReader.ALL_MUTATIONS, false)
}
class CDCMutationHandler : CommitLogReadHandler {
override fun shouldSkipSegmentOnError(exception: CommitLogReadException?): Boolean {
TODO("Not yet implemented")
}
override fun handleUnrecoverableError(exception: CommitLogReadException?) {
System.err.println("Error reading CDC log: " + exception?.message)
}
override fun handleMutation(mutation: Mutation, size: Int, entryLocation: Int, desc: CommitLogDescriptor?) {
println("mutation keyspace: ${mutation.keyspaceName}")
}
}
the only mutations I get in the output are those from the system and system_schema keyspace. I do not understand what I am doing wrong since the mutations in the keyspace "demo" do not show up. I also suppose that I should get the actual data I have inserted in the table but in the attempt to print the mutation it does not seem to be there...
I also tried to manually flush the data in the node with this command:
nodetool flush demo which does not seem to help.
What am I doing wrong?
Any help is kindly appreciated.
I’ve been trying to parse the cassandra commit logs using the CommitLogReader.java but unfortunately at the moment I have only been able to see the changes for the system and system_schema keyspaces. The changes of the table in my keyspace (demo) do not seem to show up in the logs.
Here is what I have done to enable the cdc:
cdc_enabled: true
cdc_block_writes: true
cdc_on_repair_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
commitlog_directory: /var/lib/cassandra/commitlog
CREATE KEYSPACE IF NOT EXISTS demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create table if not exists demo.test_table( uuid UUID PRIMARY KEY, name text ) with cdc=true;
// Add few elements
insert into demo.test_table ("uuid", "name") values (f26381e4-6c99-4f82-b60c-b90773418a9d, 'test');
insert into demo.test_table ("uuid", "name") values (1e1b0e16-706e-421c-977a-81dd43162ef5, 'test2');
insert into demo.test_table ("uuid", "name") values (5d7cc223-a22f-491d-856e-1639a5b35b52, 'test3');
private fun readCommitLog(commitLogFile: java.io.File) {
println("Reading CDC log: " + commitLogFile.name)
val reader = CommitLogReader()
val cdcMutationHandler: CDCMutationHandler = CDCMutationHandler()
val file = File(commitLogFile.absolutePath)
reader.readCommitLogSegment(cdcMutationHandler, file, CommitLogReader.ALL_MUTATIONS, false)
}
class CDCMutationHandler : CommitLogReadHandler {
override fun shouldSkipSegmentOnError(exception: CommitLogReadException?): Boolean {
TODO("Not yet implemented")
}
override fun handleUnrecoverableError(exception: CommitLogReadException?) {
System.err.println("Error reading CDC log: " + exception?.message)
}
override fun handleMutation(mutation: Mutation, size: Int, entryLocation: Int, desc: CommitLogDescriptor?) {
println("mutation keyspace: ${mutation.keyspaceName}")
}
}
the only mutations I get in the output are those from the system and system_schema keyspace. I do not understand what I am doing wrong since the mutations in the keyspace "demo" do not show up. I also suppose that I should get the actual data I have inserted in the table but in the attempt to print the mutation it does not seem to be there...
I also tried to manually flush the data in the node with this command:
nodetool flush demo which does not seem to help.
What am I doing wrong?
Any help is kindly appreciated.
when inserting only 3 records, you are right to use the nodetool flush
to get the data written from the memtable down to disk, but the CDC log works in segments - the default size being 32MB. You can find this in the cassandra.yaml as commitlog_segment_size
I believe the timing of the segment being released in Cassandra changed between 3.x and 4.x onwards.
In 3.x, for a segment to be released, I believe that you need at least the 32 MB written + all of the data within that segment to have been flushed for the log to be safe to move out to the cdc folder, e.g. it would not be needed on a crash anymore.
For 4.x onwards I believe it just needs the segment completed, and doesn't need all the mutations flushed prior to being released - since it hardlinks the file into the cdc folder, so the mutations are available faster than before, but still not instantly.
If you inserted just 3 rows and flushed, you would not have hit the segment size - when trying to do testing, to get around this I would create a side dummy table and fill it with enough garbage data per test run to hit the segment size.