How to handler errors/exceptions while using Spring Kafka framework?

Tags: , , ,



I am not able to find how to do custom error handling in for a spring kafka consumer.

My requirement is:

  1. For any deserialization errors, just write the error and message to database.
  2. For any errors in execution under @KafkaListener method, retry 3 times and then write the error and message to database.

From the spring docs, I found that, For 1, I will have to use ErrorHandlingDeserializer and it will then call the @KafkaListener error handler. For 2, framework provides SeekToCurrentErrorHandler which handles message retries.

I am not understanding where can I add the code to write the exception/message to database in addition to enable configured retries.

Answer

Add a recoverer to the SeekToCurrentErrorHandler

new SeekToCurrentErrorHandler((rec, ex) -> {
    Throwable cause = ex.getCause();
    if (cause instanceof DeserializationException) {
        ...
    }
    else {
        ...
    }, new FixedBackOff(2000L, 2L));

By default, deserialization exceptions are not retried; most others are retried before calling the recoverer.



Source: stackoverflow