How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?

Tags: , , , ,



I’m using a PollableMessageSource input to read from a Kafka topic. Messages on that topic are in Avro. use-native-decoding was set to true when those messages were published.

This is how I’m polling:

pollableChannels.inputChannel().poll(this::processorMethodName,
        new ParameterizedTypeReference<TypeClassName>() {
        });

pollableChannels is just an injected instance of this interface:

public interface PollableChannels {
  @Input("input-channel")
  PollableMessageSource inputChannel();
}

After seeing that the TypeClassName is not being formed properly (it’s nested objects are set to null by mistake), I started debugging the poll method and I found that it’s relying on the contentType header to select a converter, and since this has not been set (because the messages have been encoded natively), it’s falling back to using the ApplicationJsonMessageMarshallingConverter which is clearly not the right option.

If I use a regular streamListener, the use-native-decoding config property is honored fine, so the messages seem to be published ok.

Therefore, my primary question here is how to force native decoding when using pollable consumers? My borader question could be asking if properties under spring.cloud.stream.bindings.channel-name.consumer are respected at all when using a pollable consumer?

Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1

Update:

Relevant config:

spring:
  cloud.stream:
    bindings:  
      input-channel:
        content-type: application/*+avro
        destination: "topic-name"
        group: "group-name"
        consumer:
          partitioned: true
          concurrency: 3
          max-attempts: 1
          use-native-decoding: true
    kafka:
      binder:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
          value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

Answer

The ParameterzedTypeReference. argument is intended to help the message converter to convert the payload to the required type. When using native decoding, the “conversion” is done by the deserializer instead and conversion is not necessary.

So, just remove the second argument to the poll() method and conversion will be skipped.

That said, starting with version 3.0.8 (and Spring Framework 5.2.9), the conversion is a no-op, as can be seen in the example below.

However, it’s still more efficient to omit the argument to avoid any attempt at conversion.

else if (targetClass.isInstance(payload)) {
    return payload;
}

I just tested it without any problems (tested on 3.0.8, but I don’t believe there have been any changes in this area). In fact, you don’t even need useNativeDecoding for this case.

public class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}


@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {

    public static void main(String[] args) {
        SpringApplication.run(So64554618Application.class, args);
    }

    @Autowired
    PollableMessageSource source;


    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "{"bar":"baz"}".getBytes());
            Thread.sleep(5_000);
            source.poll(msg -> {
                System.out.println(msg);
            }, new ParameterizedTypeReference<Foo>() { });
        };
    }

}

interface Polled {

    @Input
    PollableMessageSource input();

}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...


Source: stackoverflow