I don’t want to auto create topics from my Consumer application when the topics are not present. I know it is a Kafka server level config to disable auto topic creation (auto.create.topics.enable = false), but I cannot do that change in my infrastructure. So I am looking for a way to disable auto topic …
Tag: kafka-consumer-api
Kafka docker – NoSuchFileException: /opt/kafka.server.keystore.jks
I have installed kafka via docker. When i run the docker-compose up command I face following errors: Below is my docker-compose.yml file: Answer Change the variable values to use /certs
How to run consumer @StreamListener only after ApplicationReadyEvent method completed?
I have a consumer method with And I have a method with an event listener Is it possible to configure @StreamListener to start listening only after @EventListener method is completed? Answer According to Spring Cloud Stream docs you could use property spring.cloud.stream.bindings.<bindingName>.consumer.a…
The Kafka topic is here, a Java consumer program finds it, but lists none of its content, while a kafka-console-consumer is able to
It’s my first Kafka program. From a kafka_2.13-3.1.0 instance, I created a Kafka topic poids_garmin_brut and filled it with this csv: And at anytime now, before or after running the program I’ll show, its content can be displayed by a kafka-console-consumer command: Here is the Java program, based…
Strategy to choose when doing Serialization and Deserialization using spring-kafka library
I need to Serialize or Deserialize any type of Java Object may be Integer/ String or <T> or User or Account in my project. There might be more than 1 type I am not sure which one to use while configuring a Kafka Producer and Consumer. There are JsonSerializer and JsonDeserializer and StringSerializer/DE…
Kafka consumer missing messages while consuming messages in loop
I am running my consumer code in loop due to memory constraints, committing my data and then loading into tables Following is the code which will run in loop But due to some reason I seem to be missing few messages. I believe this has to do something with consumer rebalancing/Committing. How can I check if my…
Avro GenericRecord deserialization not working via SpringKafka
I’m trying to simplify my consumer as much as possible. The problem is, when looking at the records coming in my Kafka listener: List<GenericRecord> incomingRecords the values are just string values. I’ve tried turning specific reader to true and false. I’ve set the value deserializer …
Can an offset of an unassigned partition be committed by KafkaConsumer.commitSync/commitAsync
Can above method be used to commit offset of an unassigned TopicPartition ? I know ConsumerRebalanceListener.onPartitionsRevoked is the right place to do final offset commit before TopicPartition rebalance. But if I commit offset of a partition which consumer does not have in its assigned list now, e.g. it lo…
How to gracefully shutdown spring-kafka consumer application
i have implemented spring-kafka consumer application. i wants consumer application graceful shutdown. the current consumer application is terminated with the Linux command kill -9 pid i am using @KafkaListener annotation now. if i quit the Spring boot app, i want to reliably close the consumer, what should i …