I am working through my first sample of Kafka Streams: When I attempt to run it I get this error: This is a sample message from the “users” topic: Value: Header: Key: What should I do to avoid this problem? Answer If the key actually has data, you shouldn’t be using Serdes.Void() or KStream<Void,
Tag: apache-kafka-streams
The Kafka topic is here, a Java consumer program finds it, but lists none of its content, while a kafka-console-consumer is able to
It’s my first Kafka program. From a kafka_2.13-3.1.0 instance, I created a Kafka topic poids_garmin_brut and filled it with this csv: And at anytime now, before or after running the program I’ll show, its content can be displayed by a kafka-console-consumer command: Here is the Java program, based on org.apache.kafka:kafka-streams:3.1.0 dependency, extracting this topic as a stream: But, while the
Does the Kafka Streams StreamBuilder always detect “duplicate” input topics?
This code creates two KStream instances separately, both are reading from the same topic: The topology looks like this: There is only one source definition which is then used two times: Now my question is: Is it always safe to assume that the StreamBuilder creates only one source (= only one consumer for the same topic)? In other words: Is
Kafka Streams: SerializationException: Size of data received by LongDeserializer is not 8
I have a small app to count the number of colors using Apache Kafka – The topics are created and producers/ consumers are started using the terminal: I provided the following inputs into the terminal: I received the error in the consumer terminal: What’s the issue here? I did brief research and find similar questions asked by other people, but,
Java Generic Types in Method signature which are not used
What is the use of specifying Generic types in the method signature when they are not getting used in the method, for instance consider the below method from Kafka Materialized: Here the types K,V,S are not used in the method. Github link for materialized class Answer What is the use of specifying Generic types in the method signature when they
How to split message into more messages
if I have a kstream with KVs like <String, List<myobject>> that I obtained from a KStream groupby + aggregate, is there a way to split each of the values of the List<> to get individual messages like <String, myobject>? I was hoping for something like flattening a list that would return individual messages with the same key, but I couldn’t
How to split the string into different kafka topic based on some conditions
I am trying to split the string into different kafka topic based on conditions. Here is the topology. Split the string into words. Match every words with conditions (here set of Good words and set of Bad words) If atleast 1 words from Bad words set found in the string, it will be sent to Bad-string topic otherwise it will
Serde String[] in Kafka
I’m new in kafka. I am using the store builder and want to have a String[] of two elements as the value associated to the key. I set up the store like this: When I call the method to have the data into the store : I receive this error: I should set the store like this: but I don’t
2 consecutive stream-stream inner joins produce wrong results: what does KStream join between streams really do internally?
The problem setting I have a stream of nodes and a stream of edges that represent consecutive updates of a graph and I want to build patterns composed of nodes and edges using multiple joins in series. Let’s suppose I want to match a pattern like: (node1) –[edge1]–> (node2). My idea is to join the stream of nodes with the
To close or to not close RocksDB Cache and WriteBufferManager in kafka streams app
I am currently playing around with a custom RocksDB configuration in my streams app by extending RocksDBConfigSetter interface. I see conflicting documentation around closing cache & writeBufferManager instances. Right now, I see that the javadoc & one of the documentation page suggests that we need to close all the instances that extend RocksObject (both Cache & WriteBufferManager instances extend this