Skip to content
Advertisement

Kafka Streams processor API context.forward

For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link

using kafka-streams to conditionally sort a json input stream

I am not finding a clear way of doing this in processorAPI.

JavaScript

Now the caller again need to check and based on key need to differentiate the sink topic. I am using processorAPI because I need use headers.

Edit :

JavaScript

When the condition is false how to push to different stream. Currently creating another predicate which collects all other records which doesn’t satisfy the above predicate in chain. Is there a way to do in same predicate?

Advertisement

Answer

When you specify your Topology, you assign names to all node and connect them:

JavaScript

If a processor “X” is connected to downstream processors “Y” and “Z”, you can use the node name to send a record to either “Y” or “Z”. If you don’t specify a name, the record is send to all downstream (“child”) processors.

JavaScript
Advertisement