Skip to content

Kafka Streams processor API context.forward

For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link

using kafka-streams to conditionally sort a json input stream

I am not finding a clear way of doing this in processorAPI.


Now the caller again need to check and based on key need to differentiate the sink topic. I am using processorAPI because I need use headers.

Edit :


When the condition is false how to push to different stream. Currently creating another predicate which collects all other records which doesn’t satisfy the above predicate in chain. Is there a way to do in same predicate?



When you specify your Topology, you assign names to all node and connect them:


If a processor “X” is connected to downstream processors “Y” and “Z”, you can use the node name to send a record to either “Y” or “Z”. If you don’t specify a name, the record is send to all downstream (“child”) processors.
