I have written a kafka consumer using spring-kafka library (spring-boot-starter-parent 2.3.4.RELEASE). I have following consumer configuration in my code
/** * configuration for kafka consumers at container level * * @return */ @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } /** * default kafka consumer factory * * @return */ @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * configuration for kafka consumer at thread level. * * @return */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GenericDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return props; }
Following is my listener method :
@KafkaListener(id = "client", topics = "MyTopic", clientIdPrefix = "client") public void listen(@Payload UserNotification data,Acknowledgment ack) { // Business logic ack.acknowledge(); }
Here I am reading 1 message at a time, apply business logic and use ack.acknowledge() to commit offset, but what I have seen, sometime offset commit succeed but many time I get org.apache.kafka.clients.consumer.CommitFailedException on line ack.acknowledge(). Here I can confirm that business logic is completed in 2 sec max. Following is the detailed exception:
2022-06-03|04:27:04.326|INSTANCEID_IS_UNDEFINED|xyz-856495f857-8nqx7|client-0-C-1|ERROR||||o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer|149|Error handler threw an exception org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.xyz.listen(java.lang.String,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2012) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1838) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1735) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1465) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1128) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748)
Can someone please help me why this is happening, because default poll interval is 5 min and group coordinator should not kickout the consumer if processing just takes 2 sec.
Advertisement
Answer
You must be able to process max.poll.records
(default 500) in max.poll.interval.ms
(default 300000 – 5 mins).
If it takes 2 seconds per record, it will take up to 16.6667 minutes to process the batch, and you will be kicked out of the group.
Reduce max.poll.records
and/or increase max.poll.interval.ms
.