Skip to content
Advertisement

Spring Cloud Stream send/consume message to different partitions with KafkaHeaders.Message_KEY

JavaScript

I set 2 different message keys for 2 partitions .

JavaScript

This is my application.yml for producer application

JavaScript

To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application.

JavaScript

. I created a function in consumer application to consume messages

JavaScript

Now it is time for testing. To test parallelism, I run 2 instances of spring boot consumer application . I was expecting to see one consumer consumes messages from one partition, other consumer consumer messages from other partition. So I expect that message a, message b, message is consumed by consumer one. Message d, message e and message f is consumer by other consumer. Because I set different message keys to assign different partitions. But all messages are consumed by only one application

JavaScript

Could you help me what I am missing.

Advertisement

Answer

You are only setting the message key as a header when you are sending. You can add the KafkaHeaders.PARTITION header on the message to force a specific partition.

If you don’t want to add a hard-coded partition through the header, you can set a partition key SpEL expression or a partition key extractor bean in your application. Both of these mechanisms are Spring Cloud Stream specific. If you provide either of these, you still need to tell Spring Cloud Stream how you want to select the partition. For that, you can use a partition selector SpEL expression or a Partition Selector strategy. If you don’t provide them, then it will use a default selector strategy by taking the hashCode of the message key % number of topic partitions.

I think you asked another related question yesterday and I linked this blog in my answer. In the last sections of that blog, all these details are explained.

Quoting from the blog:

If you don’t provide a partition key expression or partition key extractor bean, then Spring Cloud Stream will completely stay out of the business of making any partition decision for you. In that case, if the topic has more than one partition, Kafka’s default partitioning mechanisms will be triggered. By default, Kafka uses a DefaultPartitioner, which if the message has a key (see above), then using the hash of this key for computing the partition.

I think you are seeing Kafka’s default behavior in your application.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement