I want to implement Kafka producer which sends and receives Java Serialized Objects. I tried this: Producer: } Send object: Consumer: // Receive Object When I deploy the Producer I get error during deployment: Caused by: org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.SaleResponseFactory is not an instance of org.apache.kafka.common.serialization.Deserializer Custom object import org.apache.kafka.common.serialization.Deserializer; @Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) public class SaleResponseFactory implements
Tag: apache-kafka
Can an offset of an unassigned partition be committed by KafkaConsumer.commitSync/commitAsync
Can above method be used to commit offset of an unassigned TopicPartition ? I know ConsumerRebalanceListener.onPartitionsRevoked is the right place to do final offset commit before TopicPartition rebalance. But if I commit offset of a partition which consumer does not have in its assigned list now, e.g. it lost it after rebalance, how will Kafka treat it? Answer It will
Few kafka partitions are not getting assigned to any flink consumer
I have a kafka topic with 15 partitions [0-14] and I’m running flink with 5 parallelism. So ideally each parallel flink consumer should consume 3 partitions each. But even after multiple restarts, few of the kafka partitions are not subscribed by any flink workers. From the above logs, it shows that partitions 10 and 13 have been subscribed by 2
How to handler errors/exceptions while using Spring Kafka framework?
I am not able to find how to do custom error handling in for a spring kafka consumer. My requirement is: For any deserialization errors, just write the error and message to database. For any errors in execution under @KafkaListener method, retry 3 times and then write the error and message to database. From the spring docs, I found that,
Kafka Connect: Convert message from bytes to Json
I am trying to use a Google PubSub source connector to fetch data from my google cloud to kafka. I do get the data, but the message comes as bytes. I refered here and as mentioned, I have used a JSON convertor to change it. Here is my connector code part: And this what I get in my kafka: Even
Is there a need of seperate Key Schema(other than a schema for fields) while writing Avro producer?
I am writing Java Avro Producer code for my project. I have registered an Avro schema for all the fields which need to be passed. My Registered Schema- { “name”: “Claim”, “type”: “record”, “namespace”: “com.schema.avro”, “fields”: [ ] } I am using “icn” field value as a key but I don’t have a key schema registered separately. I am not
Kafka Stream-GlobalKTable join on a specific field
So I have a KStream that that gets deserialized into a POJO like so And here’s how the Global Ktable record looks like I want to be able to join the KStream’s stock_symbol field with the Ktable’s tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it into another topic. I had code
I’m getting “Topic not present in metadata after 60000 ms” message on some computers
Here’s my program I run it on Windows and Linux. On some computers it runs fine, but on other computers, specifically a Linux machine which is not the kafka machine, it consistently gives me this error: This happens, of course, when trying to send a message in the run() function, specifically in the sentence RecordMetadata metadata = sent.get(). This kafka
Connect to multiple Kafka Cluster’s using spring Kafka
I have a spring boot application that consumes messages from a topic(say topic1) in a Kafka cluster. This is how my code looks like currently. Now I want to start consuming from a different topic in another Kafka cluster. One way is to create another bean for this. But is there a better way to do this? Answer You need
running Kafka on WSL and make producer on windows
I’m running Kafka on WSL. I’m trying to make simple producer like this (I’m using intellj) but there’s a problem, when I try to run the code it shows this error i even checked the port on my Windows my question is, is it impossible to make producer on windows and while running kafka on WSL ? Answer Your Kafka