Skip to content

Tag: apache-kafka

Spring Cloud Stream send/consume message to different partitions with KafkaHeaders.Message_KEY

I set 2 different message keys for 2 partitions . This is my application.yml for producer application To test parallelism, I created a consumer application that reads messages from pf-topic. This is configuration from consumer application. . I created a function in consumer application to consume messages Now it is time for testing. To test parallelism, I run 2 instances

kafka Offset commit failing org.apache.kafka.clients.consumer.CommitFailedException

I have written a kafka consumer using spring-kafka library (spring-boot-starter-parent 2.3.4.RELEASE). I have following consumer configuration in my code Following is my listener method : Here I am reading 1 message at a time, apply business logic and use ack.acknowledge() to commit offset, but what I have seen, sometime offset commit succeed but many time I get org.apache.kafka.clients.consumer.CommitFailedException on line

Use Kakfa connection to dynamically Subscribe/Unsubscribe the Kafka Topics using Spring Boot

I’m developing a SpringBoot application which exposes the APIs to sub/unsub the Kafka topics. All we need to do is to pass the topic-name in the API call and the application will subscribe to it and consume messages. Subscribe topic API : Unsubscribe topic API : Dependency I have created KafkaConsumerConfiguration in which stated some beans (as follows). and I

Spring Integration with Kafka throwing ClassCastException

I have a case where i want to publish message from Kafka Producer, My message is just a POJO object e.g CreateRequest. So for consuming I have added below code Also,I have added setMessageConverter and setPayloadType to get response of type CreateResponse but still i am getting response of type KafkaMessageSource which is throwing java.lang.ClassCastException cannot cast KafkaMessageSource to type

How to use Kafka with SSL via logback appander?

I use this logback appender to send logs to Kafka: When Kafka was PLAINTEXT everything worked correctly. But when Kafka changed to SSL, it is not possible to send messages. I did not find the necessary information in Has anyone had this setup experience? Or maybe use something else? For any existing topic, I get an error: The

Spring Kafka multiple topic for one class dynamically

I recently wanted to add a new behavior in my project that uses spring-kafka. The idea is really simple : App1 create a new scenario name “SCENARIO_1” and publish this string in the topic “NEW_SCENARIO” App1 publish some message on topic “APP2-SCENARIO_1” and “APP3-SCENARIO_1” App2 (group-id=app2) listens on NEW_SCENARIO and creates a new consumer<Object,String> listening on a new topic “APP2-SCENARIO_1”