Skip to content
Advertisement

Tag: apache-kafka

How to gracefully shutdown spring-kafka consumer application

i have implemented spring-kafka consumer application. i wants consumer application graceful shutdown. the current consumer application is terminated with the Linux command kill -9 pid i am using @KafkaListener annotation now. if i quit the Spring boot app, i want to reliably close the consumer, what should i do ? i’ve been using @Predestory to reliably exit the spring boot

Kafka Dependencies – ccs vs ce

To develop my Kafka connector I need to add a connect-API dependency. Which one I should use? For example mongodb connector use connect-api from maven central But links from dev guide go to https://packages.confluent.io/maven/org/apache/kafka/connect-api/5.5.0-ccs/ and beside 5.5.0-ccs there is also 5.5.0-ce version. So, at this moment last versions are: 2.5.0 from maven central 5.5.0-ccs from packages.confluent.io/maven 5.5.0-ce from packages.confluent.io/maven What

SpringBoot, Kafka : java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V

I’m using spring boot v2.2.4 and Apache Kafka in my project. Below is my pom.xml file: Below is the code which i have as part of kafka But when json message is sent to kafka queue, i’m getting below error however json message is getting reached to queue, but i want to understand why i’m getting above error looking forward

Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?

I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have: a source kafka topic with multiple partitions , say 6. I am keeping only ONE StreamThread (num.stream.threads=1) I am keeping a simple processor topology: source_topic –> Processor1 –> Processor2 –> Processo3 –> sink_topic Each processor

Flink Avro Serialization shows “not serializable” error when working with GenericRecords

I’m really having a hard time making Flink to communicate properly with a running Kafka instance making use of an Avro schema from the Confluent Schema Registry (for both key and value). After a while of thinking and restructuring my programm, I was able to push my implementation so far: Producer Method GenericSerializer.java However, when I execute the Job, it

Allowed packages in custom header of Kafka-Message

In spring-kafka, how do I add classes from a package to be trusted as a custom header field? The message is being sent like this: The receiving end looks like this: The exception I keep getting is this: Caused by: org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.springframework.kafka.support.DefaultKafkaHeaderMapper$NonTrustedHeaderType] to type [@org.springframework.messaging.handler.annotation.Header my.package.MessageType] MessageType is an enum and I

How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?

I have a couple of microservices developed using spring boot and each has its own Postgres database. These microservices exchange data with a CDC mechanism provided by debezium platform through kafka broker and kafka connect. I have a microservice A that stores some entities with a BigDecimal attribute. Another microservice B depends on the data stored by A so it

Advertisement