I am not able to find how to do custom error handling in for a spring kafka consumer.
My requirement is:
- For any deserialization errors, just write the error and message to database.
- For any errors in execution under
@KafkaListener
method, retry 3 times and then write the error and message to database.
From the spring docs, I found that,
For 1, I will have to use ErrorHandlingDeserializer
and it will then call the @KafkaListener error handler.
For 2, framework provides SeekToCurrentErrorHandler
which handles message retries.
I am not understanding where can I add the code to write the exception/message to database in addition to enable configured retries.
Advertisement
Answer
Add a recoverer to the SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> { Throwable cause = ex.getCause(); if (cause instanceof DeserializationException) { ... } else { ... }, new FixedBackOff(2000L, 2L));
By default, deserialization exceptions are not retried; most others are retried before calling the recoverer.