Continue consuming subsequent records in reactor kafka after deserialization exception

Tags: , , , ,



I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages.

Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn and using combination of (doOnError and onErrorContinue), however, it helped log the exception, but failed to consume subsequent records.

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public T deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
       }
       catch {
          // throw new SerializationException("error deserializing" );
       }
   }
}

At the listener end, I’m trying to handle like this ->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
   KakfaReceiver<String, Object> receiver; // kafka listener
   receiver.receive()
   .delayUntil(do something)//logic to update the record to db
   .doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
   .onErrorContinue((throwable, o) -> log.info("continuing"))
   .doOnNext(r->r.receiverOffset().acknowledge())
   .subscribe()

}

One option is not to throw exception from the Deserializer class, but I want to log such exceptions in a separate system for analytics, and hence want handling of such records at the kafka listener end. Any thoughts on how this can be achieved?

Answer

Looks like the suggestion in the comment about using

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

will work in most cases. But I couldnt figure out of the way to make it work in Reactor Spring Kafka. For now, I went ahead with the approach of not throwing an exception from the deserializer and adding the logic to log it there itself,and that solves the issue of Kafka consumer not being able to consume subsequent records after that on poison record



Source: stackoverflow