UNKNOWN_TOPIC_OR_PARTITION handling in Spring Kafka Producer - Stack Overflow

admin2025-04-09  0

I have Kafka Producer and Consumer configured in Spring Boot application (Spring Boot version 3.4.2).

application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

In the application logic a situation may occur where the producer tries to send an event to a topic that does not exist. And I want to handle this situation in my code after the first unsuccessful attempt. I did not find a way to do this correctly with exceptions. Producer just outputs a bunch of logs:

2025-03-25T15:31:52.363+05:00  WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient   : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 5 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:52.601+05:00  WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient   : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 6 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:52.989+05:00  WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient   : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 7 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}
2025-03-25T15:31:53.973+05:00  WARN 49600 --- [app] [tter-producer-1] .apache.kafka.clients.NetworkClient   : [Producer clientId=app-producer-1] Error while fetching metadata with correlation id 8 : {new_topic=UNKNOWN_TOPIC_OR_PARTITION}

And continues trying until a minute passes:

Exception thrown when sending a message with key='null' and payload='...' to topic new_topic and partition 0:

.apache.kafkamon.errors.TimeoutException: Topic new_topic not present in metadata after 60 ms.

I tried to set up exception handling:

try {
    kafkaTemplate.send(new ProducerRecord<>(topic, partition, System.currentTimeMillis(), null, event, headers));
} catch (KafkaException ex) {
    // handle
}

And add the setting spring.kafka.producer.properties.max.block.ms=10. The log was also output, the exception was caught as expected. But after that, attempts to fetch metadata continued, I want to get rid of this. How could I do this?

转载请注明原文地址:http://conceptsofalgorithm.com/Algorithm/1744201634a235825.html

最新回复(0)