I’m developing a SpringBoot application which exposes the APIs to sub/unsub the Kafka topics. All we need to do is to pass the topic-name
in the API call and the application will subscribe to it and consume messages.
Subscribe topic API :
{FQDN}/sub/{topic-name}
Unsubscribe topic API :
{FQDN}/unsub/{topic-name}
Dependency
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.1.RELEASE</version> </dependency>
I have created KafkaConsumerConfiguration
in which stated some beans (as follows).
@EnableKafka @Configuration public class KafkaConsumerConfiguration { private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092"; @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory()); return factory; } @Bean public ConsumerFactory<String, String> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps()); } @Bean public Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } }
and I have used the ConcurrentMessageListenerContainer.start()
method to when the subscribe API is called against an topic-id
.
@Service public class KafkaConsumerService { // This map will be used to store the running consumers info. // So that when we need to stop/unsub the topic, then we can get the container from this map private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>(); @Autowired private ConsumerFactory<String, String> kafkaConsumerFactory; public void createConsumers(String topic,MessageListener messageListener) { log.info("creating kafka consumer for topic {}", topic); ContainerProperties containerProps = new ContainerProperties(topic); containerProps.setPollTimeout(100); ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps); container.setupMessageListener(messageListener); container.start(); consumersMap.put(topic, container); log.info("created and started kafka consumer for topic {}", topic); } public void stopConsumer(String topic) { log.info("stopping consumer for topic {}", topic); ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic); if (container != null) { container.stop(); log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions"); } } }
This solution is working absolutely fine. Like,
- This is sub/unsubscribing the kafka topics using the APIs
- Also receiving the messages properly.
But the issue that ,
Every time when I call the API to subscribe the topic , it creates the AdminClient.create(props)
(line 336 of below image)
This generates the logs as below ,
2022-04-26 01:31:11.676 INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = default client.id = connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120000 retries = 5 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS 2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0 2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a 2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1650918672031
I don’t want the AdminClient to create this instance each time.Because in this way, it take round 2 seconds to create a subscription to the topic. Which is unacceptable in my use-case.
Required Solution:
The kafka connection will be created only once. Then I can use that connection sub/unub the kafka topics. So that each time it doesn’t create this connection and also the time efficient improved.
thanks.
Advertisement
Answer
2.3.x has been out of support for a long time now
https://spring.io/projects/spring-kafka#support
The last 2.3.x version was 2.3.14 last July.
The admin is used to check if the topic exists; with that old version, controlled by the missingTopicsFatal
property in ContainerProperties
; it is true in that version.
With modern versions (since 2.3.4), it is false, so an AdminClient will not be created when you start the container.
But you really need to upgrade to a supported version (2.8.5 advised – 2.7.x goes out of OSS support soon).
There is no support to reuse a consumer with different (topics); a new consumer will be created each time you start the container.