Parse Cassandra 5 CDC commit logs - Stack Overflow

admin2025-04-18  1

I’ve been trying to parse the cassandra commit logs using the CommitLogReader.java but unfortunately at the moment I have only been able to see the changes for the system and system_schema keyspaces. The changes of the table in my keyspace (demo) do not seem to show up in the logs.

Here is what I have done to enable the cdc:

  1. Set these variables in /etc/cassandra/cassandra.yaml

cdc_enabled: true
cdc_block_writes: true
cdc_on_repair_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
commitlog_directory: /var/lib/cassandra/commitlog

  1. Create a keyspace and a table

CREATE KEYSPACE IF NOT EXISTS demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create table if not exists demo.test_table( uuid UUID PRIMARY KEY, name text ) with cdc=true;

// Add few elements
insert into demo.test_table ("uuid", "name") values (f26381e4-6c99-4f82-b60c-b90773418a9d, 'test');
insert into demo.test_table ("uuid", "name") values (1e1b0e16-706e-421c-977a-81dd43162ef5, 'test2');
insert into demo.test_table ("uuid", "name") values (5d7cc223-a22f-491d-856e-1639a5b35b52, 'test3');

  1. Parse CommitLogs in kotlin
    Added the dependency: ".apache.cassandra:cassandra-all:5.0.3"
    Attempt to parse the logs:
private fun readCommitLog(commitLogFile: java.io.File) {
    println("Reading CDC log: " + commitLogFile.name)

    val reader = CommitLogReader()
    val cdcMutationHandler: CDCMutationHandler = CDCMutationHandler()
    val file = File(commitLogFile.absolutePath)
    reader.readCommitLogSegment(cdcMutationHandler, file, CommitLogReader.ALL_MUTATIONS, false)
}

class CDCMutationHandler : CommitLogReadHandler {
    override fun shouldSkipSegmentOnError(exception: CommitLogReadException?): Boolean {
        TODO("Not yet implemented")
    }

    override fun handleUnrecoverableError(exception: CommitLogReadException?) {
        System.err.println("Error reading CDC log: " + exception?.message)
    }

    override fun handleMutation(mutation: Mutation, size: Int, entryLocation: Int, desc: CommitLogDescriptor?) {
        println("mutation keyspace: ${mutation.keyspaceName}")
    }
}

the only mutations I get in the output are those from the system and system_schema keyspace. I do not understand what I am doing wrong since the mutations in the keyspace "demo" do not show up. I also suppose that I should get the actual data I have inserted in the table but in the attempt to print the mutation it does not seem to be there...

I also tried to manually flush the data in the node with this command:

nodetool flush demo which does not seem to help.

What am I doing wrong?

Any help is kindly appreciated.

I’ve been trying to parse the cassandra commit logs using the CommitLogReader.java but unfortunately at the moment I have only been able to see the changes for the system and system_schema keyspaces. The changes of the table in my keyspace (demo) do not seem to show up in the logs.

Here is what I have done to enable the cdc:

  1. Set these variables in /etc/cassandra/cassandra.yaml

cdc_enabled: true
cdc_block_writes: true
cdc_on_repair_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
commitlog_directory: /var/lib/cassandra/commitlog

  1. Create a keyspace and a table

CREATE KEYSPACE IF NOT EXISTS demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create table if not exists demo.test_table( uuid UUID PRIMARY KEY, name text ) with cdc=true;

// Add few elements
insert into demo.test_table ("uuid", "name") values (f26381e4-6c99-4f82-b60c-b90773418a9d, 'test');
insert into demo.test_table ("uuid", "name") values (1e1b0e16-706e-421c-977a-81dd43162ef5, 'test2');
insert into demo.test_table ("uuid", "name") values (5d7cc223-a22f-491d-856e-1639a5b35b52, 'test3');

  1. Parse CommitLogs in kotlin
    Added the dependency: ".apache.cassandra:cassandra-all:5.0.3"
    Attempt to parse the logs:
private fun readCommitLog(commitLogFile: java.io.File) {
    println("Reading CDC log: " + commitLogFile.name)

    val reader = CommitLogReader()
    val cdcMutationHandler: CDCMutationHandler = CDCMutationHandler()
    val file = File(commitLogFile.absolutePath)
    reader.readCommitLogSegment(cdcMutationHandler, file, CommitLogReader.ALL_MUTATIONS, false)
}

class CDCMutationHandler : CommitLogReadHandler {
    override fun shouldSkipSegmentOnError(exception: CommitLogReadException?): Boolean {
        TODO("Not yet implemented")
    }

    override fun handleUnrecoverableError(exception: CommitLogReadException?) {
        System.err.println("Error reading CDC log: " + exception?.message)
    }

    override fun handleMutation(mutation: Mutation, size: Int, entryLocation: Int, desc: CommitLogDescriptor?) {
        println("mutation keyspace: ${mutation.keyspaceName}")
    }
}

the only mutations I get in the output are those from the system and system_schema keyspace. I do not understand what I am doing wrong since the mutations in the keyspace "demo" do not show up. I also suppose that I should get the actual data I have inserted in the table but in the attempt to print the mutation it does not seem to be there...

I also tried to manually flush the data in the node with this command:

nodetool flush demo which does not seem to help.

What am I doing wrong?

Any help is kindly appreciated.

Share asked Mar 7 at 11:51 zorzmol17zorzmol17 1
Add a comment  | 

1 Answer 1

Reset to default 0

when inserting only 3 records, you are right to use the nodetool flush to get the data written from the memtable down to disk, but the CDC log works in segments - the default size being 32MB. You can find this in the cassandra.yaml as commitlog_segment_size

I believe the timing of the segment being released in Cassandra changed between 3.x and 4.x onwards.

In 3.x, for a segment to be released, I believe that you need at least the 32 MB written + all of the data within that segment to have been flushed for the log to be safe to move out to the cdc folder, e.g. it would not be needed on a crash anymore.

For 4.x onwards I believe it just needs the segment completed, and doesn't need all the mutations flushed prior to being released - since it hardlinks the file into the cdc folder, so the mutations are available faster than before, but still not instantly.

If you inserted just 3 rows and flushed, you would not have hit the segment size - when trying to do testing, to get around this I would create a side dummy table and fill it with enough garbage data per test run to hit the segment size.

转载请注明原文地址:http://conceptsofalgorithm.com/Algorithm/1744931757a275199.html

最新回复(0)