I am trying to find a way to use the new DefaultErrorHandler instead of deprecated SeekToCurrentErrorHandler in spring-kafka 2.8.1, in order to override the retry default behavior in case of errors. I want to “stop” the retry process, so if an error occurs, no retry should be done.
Now I have, in a config class, the following bean that works as expected:
@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**)); factory.setConsumerFactory(requestConsumerFactory()); factory.setReplyTemplate(kafkaTemplate()); return factory; }
Since in this spring kafka version, the STCEH is deprecated, I tried to do the following, in the same config class:
@Bean public DefaultErrorHandler eh() { return new DefaultErrorHandler(new FixedBackOff(0, 1)); }
But it seems that it is not working. In case of error, the retry number is the default one, as I can see in logs:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR DefaultErrorHandler – Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for topicX
How should this DefaultErrorHandler be used in order to achieve the desired behavior? Or should I use something else?
Thx in advance!
Advertisement
Answer
factory.setCommonErrorHandler(new Default....)
Boot auto configuration of a CommonErrorHandler
bean requires Boot 2.6.
https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e