I’m getting up an application consuming kafka messages. I followed Spring-docs about Deserialization Error Handling in order to catch deserialization exception. I’ve tried the failedDeserializationFunction method. This is my Consumer Configuration Class This is the BiFunction Provider When I send just one corrupted message on the topic I got this error (in loop): org.apache.kafka.common.errors.SerializationException: Error deserializing key/value I understood that
Tag: apache-kafka
@KafkaListener is not consuming messages – issue with deserialization
Message Producer using Kafka bindings of Spring cloud streams This use below serialization spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$BytesSerde I am Trying to consume these messages in separate Consumer Application via Spring Kafka – KafkaListener Container factory configuration With this Configuration Consumer is not picking up the messages(Bytes). If I change Kafka listener to accept String then it gives me below exception : Caused
Kafka Streams processor API context.forward
For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link using kafka-streams to conditionally sort a json input stream I am not finding a clear way of
version conflict, current version [2] is different than the one provided [1]
I have a Kafka topic and a Spark application. The Spark application gets data from Kafka topic, pre aggregates it and stores it in Elastic Search. Sounds simple, right? Everything works fine as expected, but the minute I set “spark.cores” property something other than 1, I start getting After researching a bit, I think the error is because multiple cores
How to catch warning “Broker may not be available” at the Spring Kafka Listener
I am working on a POC for implementing a kafka cluster in my project. I have setup a kafka cluster in my local machine with 3 brokers. Now I am sending messages to the Kafka server using Spring MVC REST service which is internally using Spring Kafka to produce and consume messages to and from the Kafka cluster. Now i
Spring Kafka The class is not in the trusted packages
In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update. As you may see – they have different packages. After application restart I ran into the following issue: This is my config: application.properties How to solve this issue and
Apache Spark Streaming with Java & Kafka
I’m trying to run Spark Streaming example from the official Spark website Those are the dependencies I use in my pom file: This is my Java code: When I try to run it from Eclipse I get following exception: I run this from my IDE (eclipse). Do I have to create and deploy the JAR into spark to make it
Kafka Best Practices + how to set recommended setting for JVM
A recommended setting for JVM looks like following my question is – How do I set the above Java options for Kafka? I know for sure that we can set but not sure if we can append the whole line to KAFKA_HEAP_OPTS variable reference – https://community.hortonworks.com/articles/80813/kafka-best-practices-1.html Answer you can check kafka-run-class.sh, here you can see what env variables kafka uses
Kafka Stream with Avro in JAVA , schema.registry.url” which has no default value
I have the following configuration for my Kafka Stream application And I got the following error: I have tried to replace the line with but with the same error I have followed the instruction from this url when preparing my Stream application. Any suggestion? Answer If you have keys and values in Avro format the following lines should do the
KafkaStreams is not running. State is ERROR
I have a Kafka consumer class that listens to events and perform join between events (Order, Customer) and store them in a materialized view. I created another class to access state store when REST call received. But I am getting java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR. I tried to assign application.server property, it didn’t work. Class EventsListener joins