So I have a KStream that that gets deserialized into a POJO like so And here’s how the Global Ktable record looks like I want to be able to join the KStream’s stock_symbol field with the Ktable’s tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it i…
Tag: apache-kafka-streams
Kafka consumer showing numbers in unreadable format
I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable “boxes”. If I run the console consumer these are coming as empty strings This is the WordCount code …
Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?
I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have: a source kafka topic with multiple partitions , say 6. I am keeping only ONE StreamThread (num.stream.threads=1) I am keeping a simple processor to…
Kafka Streams processor API context.forward
For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link using kafka-streams to conditionally sort a json input stream I…
Kafka Stream with Avro in JAVA , schema.registry.url” which has no default value
I have the following configuration for my Kafka Stream application And I got the following error: I have tried to replace the line with but with the same error I have followed the instruction from this url when preparing my Stream application. Any suggestion? Answer If you have keys and values in Avro format …
KafkaStreams is not running. State is ERROR
I have a Kafka consumer class that listens to events and perform join between events (Order, Customer) and store them in a materialized view. I created another class to access state store when REST call received. But I am getting java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR. I …
Kafka Streams – Send on different topics depending on Streams Data
I have a kafka streams application waiting for records to be published on topic user_activity. It will receive json data and depending on the value of against a key I want to push that stream into different topics. This is my streams App code: In this code, I want to check operation type and then depending on…