What I would like to achieve:
I would like to identify the retried message using Spring Kafka when producing the retried message.
Background:
I am a Spring Kafka producer.
I produce messages that I put in a kafka broker.
Unfortunately, the kafka cluster is flaky, but not to a point where it is completely down. Usually, couple of retries will send the message successfully to Kafka.
The different consumers of the topic have the requirement to identify a message that has been retried.
Meaning, when they get a message I managed to produce without problems, no need for anything else.
However, if the message has been produced, but went thought the retry mechanism, I would like to identify / tag those a such.
Here is what I tried:
retries (defaults to Integer.MAX_VALUE): the maximum number of attempts to publish the message
delivery.timeout.ms (defaults to 120,000): the maximum time to wait for a message to be acknowledged before considering it failed
retry.backoff.ms (defaults to 100): the time to wait before retrying
retry.backoff.max.ms (defaults to 1,000): the maximum delay between consecutive retries
With this from the official doc, I went to use this construct:
Properties props = new Properties();
// other properties
props.put(RETRIES_CONFIG, 20);
props.put(RETRY_BACKOFF_MS_CONFIG, "500");
props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");
[...]
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
While this would configure the retry, it does not distinguish a retried message.
Question:
How to distinguish, identify, tag that a message has been retried?
What I would like to achieve:
I would like to identify the retried message using Spring Kafka when producing the retried message.
Background:
I am a Spring Kafka producer.
I produce messages that I put in a kafka broker.
Unfortunately, the kafka cluster is flaky, but not to a point where it is completely down. Usually, couple of retries will send the message successfully to Kafka.
The different consumers of the topic have the requirement to identify a message that has been retried.
Meaning, when they get a message I managed to produce without problems, no need for anything else.
However, if the message has been produced, but went thought the retry mechanism, I would like to identify / tag those a such.
Here is what I tried:
retries (defaults to Integer.MAX_VALUE): the maximum number of attempts to publish the message
delivery.timeout.ms (defaults to 120,000): the maximum time to wait for a message to be acknowledged before considering it failed
retry.backoff.ms (defaults to 100): the time to wait before retrying
retry.backoff.max.ms (defaults to 1,000): the maximum delay between consecutive retries
With this from the official doc, I went to use this construct:
Properties props = new Properties();
// other properties
props.put(RETRIES_CONFIG, 20);
props.put(RETRY_BACKOFF_MS_CONFIG, "500");
props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");
[...]
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
While this would configure the retry, it does not distinguish a retried message.
Question:
How to distinguish, identify, tag that a message has been retried?
To achieve your needs(identify a event that is send by retrier mechanism)
Create a DefaultErrorHandler
on the Producer Side
that will handle that
@Configuration
public class KafkaProducerErrorHandler {
private final Logger logger = LoggerFactory.getLogger(KafkaProducerErrorHandler.class);
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
return new DefaultErrorHandler((record, exception) -> {
Headers headers = record.headers();
boolean isRetried = isMessageRetried(headers);
if (!isRetried) {
headers.add("x-retried", "true".getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record.topic(), (String) record.key(), record.value());
} else {
logger.error("Dropping message (retried already): " + record.value());
// or a custom logic
}
}, new FixedBackOff(500, 20));
}
private boolean isMessageRetried(Headers headers) {
return headers.lastHeader("x-retried") != null;
}
}
On consumer side you will have the information whether the events were send by retry mechanism or not(using headers from the event) but from my point of view while this is working solution I would like to suggest you to think about DLQ(Dead Letter Queue) approach because it fits well in your case
"The different consumers of the topic have the requirement to identify a message that has been retried."
for instance you could just keep a consumer that will read only DQL