I have a POJO that will be reading the data from a Kafka Consumer. I have a couple of list objects inside it and I am not able to understand the Null behavior of it EmployeeEBO.java AssignedWorkEBO.java So I am trying to Check whether the data from kafka is Empty/Null condition for the AssignedWorkEBO and it …
Tag: spring-kafka
Use batch mode for dynamic listener
I am using a dynamic message listener. An example is shown below. I want to convert this to a batch listener (consume multiple messages at once). Is there a way to convert this, so that the listener consumes a list of consumer records? I am using spring-kafka with spring-boot. Thanks in advance Answer Impleme…
Spring kstreams cannot get processor to work – The class ‘[B’ is not in the trusted packages
Full code: https://github.com/BenedictWHD/kstreams-example So I have a producer (data-ingest), processor (external-message-processor), and consumer (internal-message-processor (This will become a processor later once I get things working, so apologies for the naming at the moment, but it is a consumer)). The …
Strategy to choose when doing Serialization and Deserialization using spring-kafka library
I need to Serialize or Deserialize any type of Java Object may be Integer/ String or <T> or User or Account in my project. There might be more than 1 type I am not sure which one to use while configuring a Kafka Producer and Consumer. There are JsonSerializer and JsonDeserializer and StringSerializer/DE…
How can I specify the second parameter in ObjectMapper based on a variable in Java?
I have a use case where I am consuming from multiple topics and based on the topic I have to create an object from a String. I have around 25 topics leading to 25 kinds of objects. So, instead of using a bunch of if-else, I want to use a map where the key will be the topic name. So,
Spring Cloud Stream Kafka with Microservices and Docker-Compose Error
I wanted to see if I can connect Spring Cloud Stream Kafka with the help of docker-compose in Docker containers, but I’m stuck and I didn’t find a solution yet, please help me. I’m working from Spring Microservices In Action; I didn’t find any help by now. Docker-compose with Kafka and…
How to handle Kafka container lifecycle using spring kafka in Kubernetes multipod deployment
I am using Spring kafka implementation and I need to start and stop my kafka consumer through an REST API. For that i am using KafkaListenerEndpointRegistry endpointRegistry endpointRegistry.getListenerContainer(“consumer1”).stop(); endpointRegistry.getListenerContainer(“consumer1”).st…
Avro GenericRecord deserialization not working via SpringKafka
I’m trying to simplify my consumer as much as possible. The problem is, when looking at the records coming in my Kafka listener: List<GenericRecord> incomingRecords the values are just string values. I’ve tried turning specific reader to true and false. I’ve set the value deserializer …
KafkaException: class is not an instance of org.apache.kafka.common.serialization.Deserializer
I want to implement Kafka producer which sends and receives Java Serialized Objects. I tried this: Producer: } Send object: Consumer: // Receive Object When I deploy the Producer I get error during deployment: Caused by: org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.Sale…
How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?
I’m using a PollableMessageSource input to read from a Kafka topic. Messages on that topic are in Avro. use-native-decoding was set to true when those messages were published. This is how I’m polling: pollableChannels is just an injected instance of this interface: After seeing that the TypeClassN…