Consumer does not read messages from the Kafka topic (Akka Stream Kafka)

We use Akka Stream Kafka for producing and consuming messages and Strimzi Kafka cluster. Here are the versions if matters: com.typesafe.akka:akka-stream-kafka_2.13:2.0.7 com.typesafe.akka:akka-…

How to ignore a method call inside a method that is being tested?

I am trying to make a test pass with mockmvc and it is failing with the following error message: Caused by: org.apache.kafka.common.config.ConfigException We have Kafka in our service layer as a …

Kafka consumer missing messages while consuming messages in loop

I am running my consumer code in loop due to memory constraints, committing my data and then loading into tables Following is the code which will run in loop // here is the main part of the component,…

To close or to not close RocksDB Cache and WriteBufferManager in kafka streams app

I am currently playing around with a custom RocksDB configuration in my streams app by extending RocksDBConfigSetter interface. I see conflicting documentation around closing cache & …

Can an offset of an unassigned partition be committed by KafkaConsumer.commitSync/commitAsync

KafkaConsumer.commitSync(Map offsets) Can above method be used to commit offset of an unassigned TopicPartition ? I know ConsumerRebalanceListener….

Few kafka partitions are not getting assigned to any flink consumer

I have a kafka topic with 15 partitions [0-14] and I’m running flink with 5 parallelism. So ideally each parallel flink consumer should consume 3 partitions each. But even after multiple restarts, few …

How to handler errors/exceptions while using Spring Kafka framework?

I am not able to find how to do custom error handling in for a spring kafka consumer. My requirement is: For any deserialization errors, just write the error and message to database. For any errors …

Is there a need of seperate Key Schema(other than a schema for fields) while writing Avro producer?

I am writing Java Avro Producer code for my project. I have registered an Avro schema for all the fields which need to be passed. My Registered Schema- { “name”: “Claim”, “…

Kafka Stream-GlobalKTable join on a specific field

So I have a KStream that that gets deserialized into a POJO like so public class FinancialMessage { public String user_id; public String stock_symbol; public String exchange_id; } And here’s how …

I’m getting “Topic not present in metadata after 60000 ms” message on some computers

Here’s my program package kafkaConsumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.LongSerializer; import …