Skip to content
Advertisement

Allowed packages in custom header of Kafka-Message

In spring-kafka, how do I add classes from a package to be trusted as a custom header field?

The message is being sent like this:

@Autowired
private KafkaTemplate kafkaTemplate;

Message<BouquetMQDTO> m = MessageBuilder
            .withPayload(payload)
            .setHeader(KafkaHeaders.TOPIC, "topic")
            .setHeader("EVENT_TYPE", MessageType.UPSERT)
            .build();
kafkaTemplate.send(m);

The receiving end looks like this:

@Component
@KafkaListener(topics = "topic")
public class KafkaController {

    @KafkaHandler
    public void listen(
        @Payload Object objectDTO,
        @Header(value = "EVENT_TYPE") MessageType messageType
    ) { 
        System.out.println(messageType);
    }
}

The exception I keep getting is this:

Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType]

MessageType is an enum and I can get it working by sending the String representation and using valueOf() on the receiving side but this solution does not quite feel right. There also loads of tutorials that use something from java.utils, which is trusted by default.

I found that you should be able to declare a bean to allow the enum to be deserialized:

@Bean
public KafkaHeaderMapper defaultKafkaHeaderMapper() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.addTrustedPackages("my.package");
    return mapper;
}

Sadly, this doesn’t work. The exceptions remains the same. I assume I have to declare some more beans and use the KafkaHeaderMapper bean in there, but I can’t seem to find out which those are. I also already have an ConsumerFactory bean where I allow packages to be used as payloads, but allowing the package the enum is from there doesn’t do anything either.

props.put(JsonDeserializer.TRUSTED_PACKAGES, "my.other.package,my.package");
return new DefaultKafkaConsumerFactory<>(props);

Advertisement

Answer

The JsonDeserializer.TRUSTED_PACKAGES is fully not related to headers. This one can deal with key or value of the `ConsumerRecord. The header mapper happens slightly in a different place.

Not sure if you use Spring Boot, but there is a MessagingMessageListenerAdapter which comes with a default MessagingMessageConverter and, therefore, default DefaultKafkaHeaderMapper. To customize for your own HeaderMapper, you need crate that MessagingMessageConverter, take a reference to a HeaderMapper and inject that converter into an AbstractKafkaListenerContainerFactory bean.

If you deal with Spring Boot, there is just enough to declare that MessagingMessageConverter and it is going to be auto-configured into an AbstractKafkaListenerContainerFactory created by the framework.

This way you can get access to your trusted packages. However I think 3it is not going to work yet because enum is not so JSON-friendly by default: https://www.baeldung.com/jackson-serialize-enums

User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement