I have connected Kafka and Spring Boot using a self-signed SSL certificate.
I have confirmed that both the AdminClient and Producer are working properly, but the @KafkaListener in the Consumer is not functioning.
Spring Boot version is 3.4.2, and Kafka version is the latest 3.9.
Before connecting with an SSL certificate, I confirmed that @KafkaListener was working properly with a PLAINTEXT connection. This issue started occurring after switching to SSL connection.
Additionally, there were no error logs when using AdminClient and Producer. There were also no logs related to the consumer.
Below are the application.yml settings and the ConsumerFactory configuration.
#application.yml
spring:
application:
name: kafka
kafka:
bootstrap-servers: localhost:9094
security:
protocol: SSL
ssl-keystore-type: PKCS12
ssl-keystore-location: "C:/.../keystore.p12"
ssl-keystore-password: ###
ssl-truststore-type: PKCS12
ssl-truststore-location: "C:/.../truststore.p12"
ssl-truststore-password: ###
consumer:
group-id: test
auto-offset-reset: earliest
properties:
security:
protocol: SSL
ssl:
keystore:
type: PKCS12
location: "C:/.../keystore.p12"
password: ###
truststore:
type: PKCS12
location: "C:/.../truststore.p12"
password: ###
producer:
retries: 3
acks: all
properties:
security:
protocol: SSL
ssl:
keystore:
type: PKCS12
location: "C:/.../keystore.p12"
password: ###
truststore:
type: PKCS12
location: "C:/.../truststore.p12"
password: ###
topic:
name: test-topic
partitions: 3
replication-factor: 1
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
@Bean("normalConsumer")
public ConsumerFactory<String, String> consumerConfig() {
Map<String, Object> configProps = new HashMap<>();
ConsumerProperties consumerProperties = kafkaProperties.getConsumer();
CommonProperties securityProperties = consumerProperties.getProperties();
SslProperties sslProperties = securityProperties.getSsl();
StoreDetailsProperties keyStore = sslProperties.getKeyStore();
StoreDetailsProperties trustStore = sslProperties.getTrustStore();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId());
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put("security.protocol", "SSL");
configProps.put("ssl.keystore.type", keyStore.getType());
configProps.put("ssl.keystore.location", keyStore.getLocation());
configProps.put("ssl.keystore.password", keyStore.getPassword());
configProps.put("ssl.truststore.type", trustStore.getType());
configProps.put("ssl.truststore.location", trustStore.getLocation());
configProps.put("ssl.truststore.password", trustStore.getPassword());
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory (
@Qualifier("normalConsumer") ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
I knew that the ConsumerFactory is automatically registered as a bean by reading the yml settings without needing to configure it manually.
However, I wanted to check if there was any difference between the auto-configured Consumer and the manually configured one, so I tried both.
Ultimately, both produced the same result.
I used the Kafka listener as follows
@Component
public class KafkaConsumerListeners {
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "new-group-id", containerFactory = "kafkaListenerContainerFactory")
public void listen(String message) {
System.out.println(message);
System.out.println("===================================================================================");
}
}
I searched for similar questions on Stack Overflow and tried the suggested solutions, but the issue remains unresolved.
I’m curious about the reason why the Kafka listener is not working.