Skip to content
Advertisement

Spring kstreams cannot get processor to work – The class ‘[B’ is not in the trusted packages

Full code: https://github.com/BenedictWHD/kstreams-example

So I have a producer (data-ingest), processor (external-message-processor), and consumer (internal-message-processor (This will become a processor later once I get things working, so apologies for the naming at the moment, but it is a consumer)).

The data-ingest works from what I can tell as it sends messages to the topic external_messages.

The external-message-processor attempts to read from that topic but fails with the following:

Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Example of a message on the topic:

Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String"}"eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"

As you can see the TypeId is for some reason “[B”.

I have specified for all 3 of the applications to use the following serializer and deserializer:

          serializer: org.springframework.kafka.support.serializer.JsonSerializer
          deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Only the data-ingest one seems to actually be using the correct serializer as on the others when the application’s startup in the producer and consumer configs we have this:

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

From what I can see in the application.yml files everything is as it should be so I am at a loss for why it is not using the serializer/deserializer I have specified and why the processor is unable to read the messages of the topic?

Any help much appreciated as I have been scratching my head on this for a couple of days now.

The repo at the start contains all the configuration files, poms and the docker-composer file to run this.

Edit – Processor Config:

spring.cloud.stream:
  function:
    definition: processExternalMessage
  bindings:
    processExternalMessage-in-0:
      destination: external_messages
    processExternalMessage-out-0:
      destination: internal_messages
  kafka:
      bindings:
        processExternalMessage-out-0:
          producer:
            configuration:
              value:
                serializer: org.springframework.kafka.support.serializer.JsonSerializer
        processExternalMessage-in-0:
          consumer:
            configuration:
              value:
                deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Producer Config:

spring.cloud.stream:
  function:
    definition: externalMessageProducer
  bindings:
    externalMessageProducer-out-0:
      destination: external_messages
  kafka:
      bindings:
        externalMessageProducer-out-0:
          producer:
            configuration:
              value:
                serializer: org.springframework.kafka.support.serializer.JsonSerializer

Advertisement

Answer

value.serializer is a flat configuration property name. value is not a nested object with a serializer field, in YAML terms

That’s why the other one seems to work

Also worth pointing out that Kstreams uses serde properties, not serializers directly

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement