I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable “boxes”. If I run the console consumer these are coming as empty strings This is the WordCount code I wrote This is the output
Tag: apache-kafka
How to gracefully shutdown spring-kafka consumer application
i have implemented spring-kafka consumer application. i wants consumer application graceful shutdown. the current consumer application is terminated with the Linux command kill -9 pid i am using @KafkaListener annotation now. if i quit the Spring boot app, i want to reliably close the consumer, what should i do ? i’ve been using @Predestory to reliably exit the spring boot
Kafka Dependencies – ccs vs ce
To develop my Kafka connector I need to add a connect-API dependency. Which one I should use? For example mongodb connector use connect-api from maven central But links from dev guide go to https://packages.confluent.io/maven/org/apache/kafka/connect-api/5.5.0-ccs/ and beside 5.5.0-ccs there is also 5.5.0-ce version. So, at this moment last versions are: 2.5.0 from maven central 5.5.0-ccs from packages.confluent.io/maven 5.5.0-ce from packages.confluent.io/maven What
SpringBoot, Kafka : java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
I’m using spring boot v2.2.4 and Apache Kafka in my project. Below is my pom.xml file: Below is the code which i have as part of kafka But when json message is sent to kafka queue, i’m getting below error however json message is getting reached to queue, but i want to understand why i’m getting above error looking forward
Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?
I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have: a source kafka topic with multiple partitions , say 6. I am keeping only ONE StreamThread (num.stream.threads=1) I am keeping a simple processor topology: source_topic –> Processor1 –> Processor2 –> Processo3 –> sink_topic Each processor
Flink Avro Serialization shows “not serializable” error when working with GenericRecords
I’m really having a hard time making Flink to communicate properly with a running Kafka instance making use of an Avro schema from the Confluent Schema Registry (for both key and value). After a while of thinking and restructuring my programm, I was able to push my implementation so far: Producer Method GenericSerializer.java However, when I execute the Job, it
Allowed packages in custom header of Kafka-Message
In spring-kafka, how do I add classes from a package to be trusted as a custom header field? The message is being sent like this: The receiving end looks like this: The exception I keep getting is this: Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType] MessageType is an enum and I
How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?
I have a couple of microservices developed using spring boot and each has its own Postgres database. These microservices exchange data with a CDC mechanism provided by debezium platform through kafka broker and kafka connect. I have a microservice A that stores some entities with a BigDecimal attribute. Another microservice B depends on the data stored by A so it
Apache Beam: Kafka consumer restarted over and over again
I have this very simple Beam Pipeline that reads records from a Kafka Topic and writes them to a Pulsar Topic: From my understanding this should create exactly one Kafka Consumer that pushes it’s values down the Pipeline. Now for some reason the Pipeline seems to restart over and over again creating multiple Kafka Consumers and multiple Pulsar Producers. Here
Spring Kafka, manual commiting in different threads
Good day collegues. I am using Spring Kafka 2.2.5 I have a listener: I do some operations and if they are succes i use Acknowledge interface to commit offset. I have a problem. While calculations are taking place in the created thread, the listener reads the message from the same offset again. Because of this, when i try to confirm