Skip to content
Advertisement

Avro GenericRecord deserialization not working via SpringKafka

I’m trying to simplify my consumer as much as possible. The problem is, when looking at the records coming in my Kafka listener:

List<GenericRecord> incomingRecords the values are just string values. I’ve tried turning specific reader to true and false. I’ve set the value deserializer as well. Am I missing something? This worked fine when I use a Java configuration class, but want to keep consolidated to this application.properties file.

application.properties

spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_ACCESS_KEY}" password="${SASL_SECRET}";
spring.kafka.consumer.auto-offset-reset=earliest

#### Consumer Properties Configuration
spring.kafka.properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

spring.kafka.bootstrap-servers=
spring.kafka.properties.schema.registry.url=
spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.spring.json.trusted.packages=*

logging.level.org.apache.kafka=TRACE
logging.level.io.confluent.kafka.schemaregistry=TRACE

consumer

    @KafkaListener(topics = "${topic}", groupId = "${group}")
    public void processMessageBatch(List<GenericRecord> incomingRecords,
                                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        currentMicroBatch = Stream.of(currentMicroBatch, incomingRecords).flatMap(List::stream).collect(Collectors.toList());
        if (currentMicroBatch.size() >= maxRecords || validatedElapsedDuration(durationMonitor)) {
            System.out.println("ETL processing logic will be done here");
        }
        clearBatch();
    }

I notice when I use:

spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

I get the following error:

2020-12-02 17:04:42.745 DEBUG 51910 — [ntainer#0-0-C-1] i.c.k.s.client.rest.RestService : Sending GET with input null to https://myschemaregistry.com

2020-12-02 17:04:42.852 ERROR 51910 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-avro-32 at offset 7836. If needed, please seek past the record to continue consumption.
java.lang.IllegalArgumentException: argument "src" is null
    at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4735)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3502)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:270)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:573)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:557)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:149)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:241)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1062)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1018)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:949)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)

Advertisement

Answer

I found the issue. Debugging deep into the rest client for confluent, I was hit with a 401 (terrible logs btw)

I needed to add this: spring.kafka.properties.basic.auth.credentials.source=SASL_INHERIT

since I’m using SASL auth and needed registry to inherit the SASL config I added up above. fun stuff..

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement