Skip to content
Advertisement

Use Kakfa connection to dynamically Subscribe/Unsubscribe the Kafka Topics using Spring Boot

I’m developing a SpringBoot application which exposes the APIs to sub/unsub the Kafka topics. All we need to do is to pass the topic-name in the API call and the application will subscribe to it and consume messages.

Subscribe topic API :

{FQDN}/sub/{topic-name}

Unsubscribe topic API :

{FQDN}/unsub/{topic-name}

Dependency

         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.1.RELEASE</version>
        </dependency>

I have created KafkaConsumerConfiguration in which stated some beans (as follows).

@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
    private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    @Bean
    public Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}

and I have used the ConcurrentMessageListenerContainer.start() method to when the subscribe API is called against an topic-id.

@Service
public class KafkaConsumerService {

    // This map will be used to store the running consumers info. 
    // So that when we need to stop/unsub the topic, then we can get the container from this map
    private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>();

    @Autowired
    private ConsumerFactory<String, String> kafkaConsumerFactory;

    public void createConsumers(String topic,MessageListener messageListener) {
        log.info("creating kafka consumer for topic {}", topic);

        ContainerProperties containerProps = new ContainerProperties(topic);
        containerProps.setPollTimeout(100);

        ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps);
        container.setupMessageListener(messageListener);

        container.start();

        consumersMap.put(topic, container);

        log.info("created and started kafka consumer for topic {}", topic);
    }


   public void stopConsumer(String topic) {
        log.info("stopping consumer for topic {}", topic);
        ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
        if (container != null) {
            container.stop();
            log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions");
        }
    }

}

This solution is working absolutely fine. Like,

  • This is sub/unsubscribing the kafka topics using the APIs
  • Also receiving the messages properly.

But the issue that , Every time when I call the API to subscribe the topic , it creates the AdminClient.create(props)(line 336 of below image)

AbstractMessageListenerContainer.class

This generates the logs as below ,

2022-04-26 01:31:11.676  INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1650918672031

I don’t want the AdminClient to create this instance each time.Because in this way, it take round 2 seconds to create a subscription to the topic. Which is unacceptable in my use-case.

Required Solution:

The kafka connection will be created only once. Then I can use that connection sub/unub the kafka topics. So that each time it doesn’t create this connection and also the time efficient improved.

thanks.

Advertisement

Answer

2.3.x has been out of support for a long time now

https://spring.io/projects/spring-kafka#support

The last 2.3.x version was 2.3.14 last July.

The admin is used to check if the topic exists; with that old version, controlled by the missingTopicsFatal property in ContainerProperties; it is true in that version.

With modern versions (since 2.3.4), it is false, so an AdminClient will not be created when you start the container.

But you really need to upgrade to a supported version (2.8.5 advised – 2.7.x goes out of OSS support soon).

There is no support to reuse a consumer with different (topics); a new consumer will be created each time you start the container.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement