Full code: https://github.com/BenedictWHD/kstreams-example
So I have a producer (data-ingest
), processor (external-message-processor
), and consumer (internal-message-processor
(This will become a processor later once I get things working, so apologies for the naming at the moment, but it is a consumer)).
The data-ingest
works from what I can tell as it sends messages to the topic external_messages
.
The external-message-processor
attempts to read from that topic but fails with the following:
Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Example of a message on the topic:
Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String"}"eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"
As you can see the TypeId is for some reason “[B”.
I have specified for all 3 of the applications to use the following serializer and deserializer:
serializer: org.springframework.kafka.support.serializer.JsonSerializer deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Only the data-ingest
one seems to actually be using the correct serializer as on the others when the application’s startup in the producer and consumer configs we have this:
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
From what I can see in the application.yml files everything is as it should be so I am at a loss for why it is not using the serializer/deserializer I have specified and why the processor is unable to read the messages of the topic?
Any help much appreciated as I have been scratching my head on this for a couple of days now.
The repo at the start contains all the configuration files, poms and the docker-composer file to run this.
Edit – Processor Config:
spring.cloud.stream: function: definition: processExternalMessage bindings: processExternalMessage-in-0: destination: external_messages processExternalMessage-out-0: destination: internal_messages kafka: bindings: processExternalMessage-out-0: producer: configuration: value: serializer: org.springframework.kafka.support.serializer.JsonSerializer processExternalMessage-in-0: consumer: configuration: value: deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Producer Config:
spring.cloud.stream: function: definition: externalMessageProducer bindings: externalMessageProducer-out-0: destination: external_messages kafka: bindings: externalMessageProducer-out-0: producer: configuration: value: serializer: org.springframework.kafka.support.serializer.JsonSerializer
Advertisement
Answer
value.serializer
is a flat configuration property name. value
is not a nested object with a serializer
field, in YAML terms
That’s why the other one seems to work
Also worth pointing out that Kstreams uses serde
properties, not serializers directly