I am using a dynamic message listener. An example is shown below. I want to convert this to a batch listener (consume multiple messages at once). Is there a way to convert this, so that the listener consumes a list of consumer records? I am using spring-kafka with spring-boot. Thanks in advance Answer Implement BatchAcknowledgingConsumerAwareMessageListener instead.
Tag: apache-kafka
KsqlClientException: Received 401 response from server: Unauthorized. Error code: 40100
Trying to connect to Confluent hosted KSQL db. And i get an exception during client.listStreams().get(): What am i missing here? Answer Did you check the API_KEY/API_SECRET on your Confluent Cloud cluster to see if it is granted access to KSQLDB?
ConsumerRecords is always empty in Kafka, Java, but Future isDone method result true
I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic: GlobalConfiguration class stores all my properties. For Kafka: Then I send message in this way I check if message is sent with sendResponse.isDone() and it returns true. But then I try to receive message: And
Spring kstreams cannot get processor to work – The class ‘[B’ is not in the trusted packages
Full code: https://github.com/BenedictWHD/kstreams-example So I have a producer (data-ingest), processor (external-message-processor), and consumer (internal-message-processor (This will become a processor later once I get things working, so apologies for the naming at the moment, but it is a consumer)). The data-ingest works from what I can tell as it sends messages to the topic external_messages. The external-message-processor attempts to read from
Error Classpath is empty. Please build the project first e.g. by running apache kafka
i was following this step to download apache kafka into my device windows https://www.confluent.io/blog/set-up-and-run-kafka-on-windows-linux-wsl-2/ but when i did this bin/zookeeper-server-start.sh config/zookeeper.properties i always stuck with this error Error Classpath is empty. Please build the project first e.g. where my wrong at? Answer Sounds like you might have downloaded the Kafka sources (labeled src) rather than the binaries Download the binary
Why is there a “topic” parameter in the overridden serialize() method from Serializer interface in org.apache.kafka.common.serialization
I have observed that implementations of the method serialize() of the Serializer<> interface has two parameters: byte[] serialize(String topic, T data) but the method body does not require String topic parameter at all. So why does it exist? Sample Implementation available in the package org.apache.kafka.common.serialization: Answer In this specific implementation, indeed there’s no usage. Yet, this parameter may be used
Failed to create topics”,”exception”:”norg.apache.kafka.common.errors.UnsupportedVersionException
Please suggest what changes are required as i am getting this error. Answer I see you are new here. You should always include version information and full stack trace for questions like this. Upgrade your broker to >= 2.4 or set the binder replication factor property. See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/4161f875ede0446ab1d485730c51e6a2c5baa37a Change default replication factor to -1 Binder now uses a default value
what is the use of the property spring.cloud.stream.bindings..consumer.partitioned
What will happen If partitionCount (spring.cloud.stream.bindings..producer.partitionCount) is greater than 1 and consumer.partitioned (spring.cloud.stream.bindings..consumer.partitioned) is false (Using Kafka) Answer In the case of Kafa binder, the property spring.cloud.stream.bindings..consumer.partitioned is not relevant. You can skip setting this property on the consumer side. This property’s default value is false. Since Kafka has built-in partitioning support, the binder will simply delegate to Kafka broker
Strategy to choose when doing Serialization and Deserialization using spring-kafka library
I need to Serialize or Deserialize any type of Java Object may be Integer/ String or <T> or User or Account in my project. There might be more than 1 type I am not sure which one to use while configuring a Kafka Producer and Consumer. There are JsonSerializer and JsonDeserializer and StringSerializer/DESer and many more types. I have read
Consumer does not read messages from the Kafka topic (Akka Stream Kafka)
We use Akka Stream Kafka for producing and consuming messages and Strimzi Kafka cluster. Here are the versions if matters: After a refactoring message consumer stopped working. We do have messages in the topic, but consumer just waits endlessly. Here is the log fragment: Some more points: Schema registry properly configured and good (otherwise producer would not work). Topic (and