In a Quarkus app I have the following handler which will retry to deserialize the Kafka message approximately every few seconds until there's success or the application is manually stopped.
@ApplicationScoped
@Identifier("event-deserialization-failure-handler")
public class EventDeserializationFailureHandler implements DeserializationFailureHandler<GenericData.Record> {
public EventDeserializationFailureHandler() {}
@Override
public GenericData.Record decorateDeserialization(Uni<GenericData.Record> deserialization, String topic, boolean isKey,
String deserializer, byte[] data, Headers headers) {
return deserialization
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofMillis(5)).indefinitely()
.await().indefinitely();
}
}
It works as expected except upon app shutdown. After attempting to shutdown (in IDE or via SIGINT/Ctrl+C), the process is still running requiring a hard kill
command.
A similar result will happen if I do something like this and attempt to stop it before it completes:
.onFailure().retry().atMost(Long.MAX_VALUE).await().indefinitely();
I've also tried passing my own Executor to the retry()
using .withExecutor(myExecutor)
and shutting it down via
@PreDestroy
void cleanup() {
myExecutor.shutdown();
}
But that cleanup method never gets called during the non-kill shutdown process.