java - identify retried message with spring kafka producer - Stack Overflow

admin2025-04-20  0

What I would like to achieve:

I would like to identify the retried message using Spring Kafka when producing the retried message.

Background:

I am a Spring Kafka producer.

I produce messages that I put in a kafka broker.

Unfortunately, the kafka cluster is flaky, but not to a point where it is completely down. Usually, couple of retries will send the message successfully to Kafka.

The different consumers of the topic have the requirement to identify a message that has been retried.

Meaning, when they get a message I managed to produce without problems, no need for anything else.

However, if the message has been produced, but went thought the retry mechanism, I would like to identify / tag those a such.

Here is what I tried:

retries (defaults to Integer.MAX_VALUE): the maximum number of attempts to publish the message
delivery.timeout.ms (defaults to 120,000): the maximum time to wait for a message to be acknowledged before considering it failed
retry.backoff.ms (defaults to 100): the time to wait before retrying
retry.backoff.max.ms (defaults to 1,000): the maximum delay between consecutive retries

With this from the official doc, I went to use this construct:

 Properties props = new Properties();
    // other properties
    props.put(RETRIES_CONFIG, 20);
    props.put(RETRY_BACKOFF_MS_CONFIG, "500");
    props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");

[...]
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

While this would configure the retry, it does not distinguish a retried message.

Question:

How to distinguish, identify, tag that a message has been retried?

What I would like to achieve:

I would like to identify the retried message using Spring Kafka when producing the retried message.

Background:

I am a Spring Kafka producer.

I produce messages that I put in a kafka broker.

Unfortunately, the kafka cluster is flaky, but not to a point where it is completely down. Usually, couple of retries will send the message successfully to Kafka.

The different consumers of the topic have the requirement to identify a message that has been retried.

Meaning, when they get a message I managed to produce without problems, no need for anything else.

However, if the message has been produced, but went thought the retry mechanism, I would like to identify / tag those a such.

Here is what I tried:

retries (defaults to Integer.MAX_VALUE): the maximum number of attempts to publish the message
delivery.timeout.ms (defaults to 120,000): the maximum time to wait for a message to be acknowledged before considering it failed
retry.backoff.ms (defaults to 100): the time to wait before retrying
retry.backoff.max.ms (defaults to 1,000): the maximum delay between consecutive retries

With this from the official doc, I went to use this construct:

 Properties props = new Properties();
    // other properties
    props.put(RETRIES_CONFIG, 20);
    props.put(RETRY_BACKOFF_MS_CONFIG, "500");
    props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");

[...]
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

While this would configure the retry, it does not distinguish a retried message.

Question:

How to distinguish, identify, tag that a message has been retried?

Share Improve this question asked Mar 2 at 14:00 PatPandaPatPanda 5,17229 gold badges120 silver badges258 bronze badges 4
  • 1 You can set metadata in Apache Kafka indicating that a message is sent due to a retry mechanism, you can add custom headers and based on the headers you will be able to distinguish the message that has been sent by retry mechanism. – Andrei Lisa Commented Mar 3 at 8:09
  • This seems to be good! Thanks and upvoted. Just wondering if you have some snippet code with examples @AndreiLisa – PatPanda Commented Mar 3 at 9:58
  • I can work on that, I will write an pseudo code answer with all the steps you have to do. – Andrei Lisa Commented Mar 3 at 10:08
  • As I said I wrote that answer. It is not a ready to use solution but can be as a starting point. – Andrei Lisa Commented Mar 3 at 18:30
Add a comment  | 

1 Answer 1

Reset to default 0

To achieve your needs(identify a event that is send by retrier mechanism)

Create a DefaultErrorHandler on the Producer Side that will handle that

    @Configuration
    public class KafkaProducerErrorHandler {
        private final Logger logger = LoggerFactory.getLogger(KafkaProducerErrorHandler.class);
    
        @Bean
        public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
            return new DefaultErrorHandler((record, exception) -> {
                Headers headers = record.headers();
                boolean isRetried = isMessageRetried(headers);
    
                if (!isRetried) {
                    headers.add("x-retried", "true".getBytes(StandardCharsets.UTF_8));
    
                    kafkaTemplate.send(record.topic(), (String) record.key(), record.value());
                } else {
                    logger.error("Dropping message (retried already): " + record.value());
// or a custom logic 
                }
            }, new FixedBackOff(500, 20));
        }
    
        private boolean isMessageRetried(Headers headers) {
            return headers.lastHeader("x-retried") != null;
        }
    }

On consumer side you will have the information whether the events were send by retry mechanism or not(using headers from the event) but from my point of view while this is working solution I would like to suggest you to think about DLQ(Dead Letter Queue) approach because it fits well in your case

"The different consumers of the topic have the requirement to identify a message that has been retried." for instance you could just keep a consumer that will read only DQL

转载请注明原文地址:http://conceptsofalgorithm.com/Algorithm/1745121122a286150.html

最新回复(0)