I’m using a PollableMessageSource
input to read from a Kafka topic. Messages on that topic are in Avro. use-native-decoding
was set to true when those messages were published.
This is how I’m polling:
pollableChannels.inputChannel().poll(this::processorMethodName, new ParameterizedTypeReference<TypeClassName>() { });
pollableChannels
is just an injected instance of this interface:
public interface PollableChannels { @Input("input-channel") PollableMessageSource inputChannel(); }
After seeing that the TypeClassName
is not being formed properly (it’s nested objects are set to null by mistake), I started debugging the poll
method and I found that it’s relying on the contentType
header to select a converter, and since this has not been set (because the messages have been encoded natively), it’s falling back to using the ApplicationJsonMessageMarshallingConverter
which is clearly not the right option.
If I use a regular streamListener, the use-native-decoding
config property is honored fine, so the messages seem to be published ok.
Therefore, my primary question here is how to force native decoding when using pollable consumers?
My borader question could be asking if properties under spring.cloud.stream.bindings.channel-name.consumer
are respected at all when using a pollable consumer?
Spring cloud stream version: 2.2.0.RELEASE Spring Kafka: 2.2.5.RELEASE Confluent version for the serializer: 5.2.1
Update:
Relevant config:
spring: cloud.stream: bindings: input-channel: content-type: application/*+avro destination: "topic-name" group: "group-name" consumer: partitioned: true concurrency: 3 max-attempts: 1 use-native-decoding: true kafka: binder: configuration: key.serializer: org.apache.kafka.common.serialization.StringSerializer value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
Advertisement
Answer
The ParameterzedTypeReference
. argument is intended to help the message converter to convert the payload to the required type. When using native decoding, the “conversion” is done by the deserializer instead and conversion is not necessary.
So, just remove the second argument to the poll()
method and conversion will be skipped.
That said, starting with version 3.0.8 (and Spring Framework 5.2.9), the conversion is a no-op, as can be seen in the example below.
However, it’s still more efficient to omit the argument to avoid any attempt at conversion.
else if (targetClass.isInstance(payload)) { return payload; }
I just tested it without any problems (tested on 3.0.8, but I don’t believe there have been any changes in this area). In fact, you don’t even need useNativeDecoding
for this case.
public class Foo { private String bar; public Foo() { } public Foo(String bar) { this.bar = bar; } public String getBar() { return this.bar; } public void setBar(String bar) { this.bar = bar; } @Override public String toString() { return "Foo [bar=" + this.bar + "]"; } } @SpringBootApplication @EnableBinding(Polled.class) public class So64554618Application { public static void main(String[] args) { SpringApplication.run(So64554618Application.class, args); } @Autowired PollableMessageSource source; @Bean public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) { return args -> { template.send("input", "{"bar":"baz"}".getBytes()); Thread.sleep(5_000); source.poll(msg -> { System.out.println(msg); }, new ParameterizedTypeReference<Foo>() { }); }; } } interface Polled { @Input PollableMessageSource input(); }
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true spring.cloud.stream.bindings.input.group=foo spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...