ssl - Spring boot @kafkaListeners does not work - Stack Overflow

admin2025-04-18  1

I have connected Kafka and Spring Boot using a self-signed SSL certificate.

I have confirmed that both the AdminClient and Producer are working properly, but the @KafkaListener in the Consumer is not functioning.

Spring Boot version is 3.4.2, and Kafka version is the latest 3.9.

Before connecting with an SSL certificate, I confirmed that @KafkaListener was working properly with a PLAINTEXT connection. This issue started occurring after switching to SSL connection.

Additionally, there were no error logs when using AdminClient and Producer. There were also no logs related to the consumer.

Below are the application.yml settings and the ConsumerFactory configuration.

#application.yml
spring:
  application:
    name: kafka

  kafka:
    bootstrap-servers: localhost:9094
    security:
      protocol: SSL
    ssl-keystore-type: PKCS12
    ssl-keystore-location: "C:/.../keystore.p12"
    ssl-keystore-password: ###
    ssl-truststore-type: PKCS12
    ssl-truststore-location: "C:/.../truststore.p12"
    ssl-truststore-password: ###

    consumer:
      group-id: test
      auto-offset-reset: earliest
      properties:
        security:
          protocol: SSL
        ssl:
          keystore:
            type: PKCS12
            location: "C:/.../keystore.p12"
            password: ###
          truststore:
            type: PKCS12
            location: "C:/.../truststore.p12"
            password: ###
    producer:
      retries: 3
      acks: all
      properties:
        security:
          protocol: SSL
        ssl:
          keystore:
            type: PKCS12
            location: "C:/.../keystore.p12"
            password: ###
          truststore:
            type: PKCS12
            location: "C:/.../truststore.p12"
            password: ###
    topic:
      name: test-topic
      partitions: 3
      replication-factor: 1
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
    private final KafkaProperties kafkaProperties;

    @Bean("normalConsumer")
    public ConsumerFactory<String, String> consumerConfig() {
        Map<String, Object> configProps = new HashMap<>();

        ConsumerProperties consumerProperties = kafkaProperties.getConsumer();
        CommonProperties securityProperties = consumerProperties.getProperties();
        SslProperties sslProperties = securityProperties.getSsl();
        StoreDetailsProperties keyStore = sslProperties.getKeyStore();
        StoreDetailsProperties trustStore = sslProperties.getTrustStore();

        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId());
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        configProps.put("security.protocol", "SSL");
        configProps.put("ssl.keystore.type", keyStore.getType());
        configProps.put("ssl.keystore.location", keyStore.getLocation());
        configProps.put("ssl.keystore.password", keyStore.getPassword());
        configProps.put("ssl.truststore.type", trustStore.getType());
        configProps.put("ssl.truststore.location", trustStore.getLocation());
        configProps.put("ssl.truststore.password", trustStore.getPassword());

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory (
            @Qualifier("normalConsumer") ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

I knew that the ConsumerFactory is automatically registered as a bean by reading the yml settings without needing to configure it manually.

However, I wanted to check if there was any difference between the auto-configured Consumer and the manually configured one, so I tried both.

Ultimately, both produced the same result.

I used the Kafka listener as follows

@Component
public class KafkaConsumerListeners {
    @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "new-group-id", containerFactory = "kafkaListenerContainerFactory")
    public void listen(String message) {
        System.out.println(message);
        System.out.println("===================================================================================");
    }
}

  • I tried setting auto-offset-reset to earliest in application.yml.
  • I checked whether @EnableKafka is added to the Application class.
  • I attempted using a new groupId in @KafkaListener.
  • I verified that messages were actually stored in the topic through the producer.
  • I also checked that the topic specified in the Kafka listener matches the actual topic name.

I searched for similar questions on Stack Overflow and tried the suggested solutions, but the issue remains unresolved.

I’m curious about the reason why the Kafka listener is not working.

转载请注明原文地址:http://conceptsofalgorithm.com/Algorithm/1744965265a277158.html

最新回复(0)