So I have a KStream that that gets deserialized into a POJO like so And here’s how the Global Ktable record looks like I want to be able to join the KStream’s stock_symbol field with the Ktable’s tckr field. Is this possible? I want to create a new EnrichedMessage object before I stream it into another topic. I had code
Tag: apache-kafka-streams
Kafka consumer showing numbers in unreadable format
I am trying out the kafka streaming. I am reading messages from one topic and doing groupByKey and then doing the count of groups. But the problem is that the messages count is coming as unreadable “boxes”. If I run the console consumer these are coming as empty strings This is the WordCount code I wrote This is the output
Understanding Number of StreamProcessor instances created and do stream task share same streamprocessor instance?
I want to understand a little more details on the relationship between StreamThread, StreamTask and how many instances of StreamProcessor is created when we have: a source kafka topic with multiple partitions , say 6. I am keeping only ONE StreamThread (num.stream.threads=1) I am keeping a simple processor topology: source_topic –> Processor1 –> Processor2 –> Processo3 –> sink_topic Each processor
Kafka Streams processor API context.forward
For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link using kafka-streams to conditionally sort a json input stream I am not finding a clear way of
Kafka Stream with Avro in JAVA , schema.registry.url” which has no default value
I have the following configuration for my Kafka Stream application And I got the following error: I have tried to replace the line with but with the same error I have followed the instruction from this url when preparing my Stream application. Any suggestion? Answer If you have keys and values in Avro format the following lines should do the
KafkaStreams is not running. State is ERROR
I have a Kafka consumer class that listens to events and perform join between events (Order, Customer) and store them in a materialized view. I created another class to access state store when REST call received. But I am getting java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR. I tried to assign application.server property, it didn’t work. Class EventsListener joins
Kafka Streams – Send on different topics depending on Streams Data
I have a kafka streams application waiting for records to be published on topic user_activity. It will receive json data and depending on the value of against a key I want to push that stream into different topics. This is my streams App code: In this code, I want to check operation type and then depending on that I want