I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable “boxes”. If I run the console consumer these are coming as empty strings This is the WordCount code …
Tag: apache-kafka
How to gracefully shutdown spring-kafka consumer application
i have implemented spring-kafka consumer application. i wants consumer application graceful shutdown. the current consumer application is terminated with the Linux command kill -9 pid i am using @KafkaListener annotation now. if i quit the Spring boot app, i want to reliably close the consumer, what should i …
Kafka Dependencies – ccs vs ce
To develop my Kafka connector I need to add a connect-API dependency. Which one I should use? For example mongodb connector use connect-api from maven central But links from dev guide go to https://packages.confluent.io/maven/org/apache/kafka/connect-api/5.5.0-ccs/ and beside 5.5.0-ccs there is also 5.5.0-ce …
SpringBoot, Kafka : java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
I’m using spring boot v2.2.4 and Apache Kafka in my project. Below is my pom.xml file: Below is the code which i have as part of kafka But when json message is sent to kafka queue, i’m getting below error however json message is getting reached to queue, but i want to understand why i’m gett…
Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?
I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have: a source kafka topic with multiple partitions , say 6. I am keeping only ONE StreamThread (num.stream.threads=1) I am keeping a simple processor to…
Flink Avro Serialization shows “not serializable” error when working with GenericRecords
I’m really having a hard time making Flink to communicate properly with a running Kafka instance making use of an Avro schema from the Confluent Schema Registry (for both key and value). After a while of thinking and restructuring my programm, I was able to push my implementation so far: Producer Method…
Allowed packages in custom header of Kafka-Message
In spring-kafka, how do I add classes from a package to be trusted as a custom header field? The message is being sent like this: The receiving end looks like this: The exception I keep getting is this: Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of conve…
How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?
I have a couple of microservices developed using spring boot and each has its own Postgres database. These microservices exchange data with a CDC mechanism provided by debezium platform through kafka broker and kafka connect. I have a microservice A that stores some entities with a BigDecimal attribute. Anoth…
Apache Beam: Kafka consumer restarted over and over again
I have this very simple Beam Pipeline that reads records from a Kafka Topic and writes them to a Pulsar Topic: From my understanding this should create exactly one Kafka Consumer that pushes it’s values down the Pipeline. Now for some reason the Pipeline seems to restart over and over again creating mul…
Spring Kafka, manual commiting in different threads
Good day collegues. I am using Spring Kafka 2.2.5 I have a listener: I do some operations and if they are succes i use Acknowledge interface to commit offset. I have a problem. While calculations are taking place in the created thread, the listener reads the message from the same offset again. Because of this…