I am using a dynamic message listener. An example is shown below. I want to convert this to a batch listener (consume multiple messages at once). Is there a way to convert this, so that the listener consumes a list of consumer records? I am using spring-kafka with spring-boot. Thanks in advance Answer Impleme…
Tag: apache-kafka
KsqlClientException: Received 401 response from server: Unauthorized. Error code: 40100
Trying to connect to Confluent hosted KSQL db. And i get an exception during client.listStreams().get(): What am i missing here? Answer Did you check the API_KEY/API_SECRET on your Confluent Cloud cluster to see if it is granted access to KSQLDB?
ConsumerRecords is always empty in Kafka, Java, but Future isDone method result true
I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic: GlobalConfiguration class stores all my properties. For Kafka: Then I send message in this way I check if message is sent with sendResponse.isDone() and it returns true…
Spring kstreams cannot get processor to work – The class ‘[B’ is not in the trusted packages
Full code: https://github.com/BenedictWHD/kstreams-example So I have a producer (data-ingest), processor (external-message-processor), and consumer (internal-message-processor (This will become a processor later once I get things working, so apologies for the naming at the moment, but it is a consumer)). The …
Error Classpath is empty. Please build the project first e.g. by running apache kafka
i was following this step to download apache kafka into my device windows https://www.confluent.io/blog/set-up-and-run-kafka-on-windows-linux-wsl-2/ but when i did this bin/zookeeper-server-start.sh config/zookeeper.properties i always stuck with this error Error Classpath is empty. Please build the project f…
Why is there a “topic” parameter in the overridden serialize() method from Serializer interface in org.apache.kafka.common.serialization
I have observed that implementations of the method serialize() of the Serializer<> interface has two parameters: byte[] serialize(String topic, T data) but the method body does not require String topic parameter at all. So why does it exist? Sample Implementation available in the package org.apache.kafk…
Failed to create topics”,”exception”:”norg.apache.kafka.common.errors.UnsupportedVersionException
Please suggest what changes are required as i am getting this error. Answer I see you are new here. You should always include version information and full stack trace for questions like this. Upgrade your broker to >= 2.4 or set the binder replication factor property. See https://github.com/spring-cloud/sp…
what is the use of the property spring.cloud.stream.bindings..consumer.partitioned
What will happen If partitionCount (spring.cloud.stream.bindings..producer.partitionCount) is greater than 1 and consumer.partitioned (spring.cloud.stream.bindings..consumer.partitioned) is false (Using Kafka) Answer In the case of Kafa binder, the property spring.cloud.stream.bindings..consumer.partitioned i…
Strategy to choose when doing Serialization and Deserialization using spring-kafka library
I need to Serialize or Deserialize any type of Java Object may be Integer/ String or <T> or User or Account in my project. There might be more than 1 type I am not sure which one to use while configuring a Kafka Producer and Consumer. There are JsonSerializer and JsonDeserializer and StringSerializer/DE…
Consumer does not read messages from the Kafka topic (Akka Stream Kafka)
We use Akka Stream Kafka for producing and consuming messages and Strimzi Kafka cluster. Here are the versions if matters: After a refactoring message consumer stopped working. We do have messages in the topic, but consumer just waits endlessly. Here is the log fragment: Some more points: Schema registry prop…