In spring-kafka, how do I add classes from a package to be trusted as a custom header field?
The message is being sent like this:
@Autowired private KafkaTemplate kafkaTemplate; Message<BouquetMQDTO> m = MessageBuilder .withPayload(payload) .setHeader(KafkaHeaders.TOPIC, "topic") .setHeader("EVENT_TYPE", MessageType.UPSERT) .build(); kafkaTemplate.send(m);
The receiving end looks like this:
@Component @KafkaListener(topics = "topic") public class KafkaController { @KafkaHandler public void listen( @Payload Object objectDTO, @Header(value = "EVENT_TYPE") MessageType messageType ) { System.out.println(messageType); } }
The exception I keep getting is this:
Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType]
MessageType is an enum and I can get it working by sending the String representation and using valueOf() on the receiving side but this solution does not quite feel right. There also loads of tutorials that use something from java.utils, which is trusted by default.
I found that you should be able to declare a bean to allow the enum to be deserialized:
@Bean public KafkaHeaderMapper defaultKafkaHeaderMapper() { DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); mapper.addTrustedPackages("my.package"); return mapper; }
Sadly, this doesn’t work. The exceptions remains the same. I assume I have to declare some more beans and use the KafkaHeaderMapper bean in there, but I can’t seem to find out which those are. I also already have an ConsumerFactory bean where I allow packages to be used as payloads, but allowing the package the enum is from there doesn’t do anything either.
props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package"); return new DefaultKafkaConsumerFactory<>(props);
Advertisement
Answer
The JsonDeserializer.TRUSTED_PACKAGES
is fully not related to headers.
This one can deal with key
or value
of the `ConsumerRecord. The header mapper happens slightly in a different place.
Not sure if you use Spring Boot, but there is a MessagingMessageListenerAdapter
which comes with a default MessagingMessageConverter
and, therefore, default DefaultKafkaHeaderMapper
. To customize for your own HeaderMapper
, you need crate that MessagingMessageConverter
, take a reference to a HeaderMapper
and inject that converter into an AbstractKafkaListenerContainerFactory
bean.
If you deal with Spring Boot, there is just enough to declare that MessagingMessageConverter
and it is going to be auto-configured into an AbstractKafkaListenerContainerFactory
created by the framework.
This way you can get access to your trusted packages. However I think 3it is not going to work yet because enum
is not so JSON-friendly by default: https://www.baeldung.com/jackson-serialize-enums