For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link
using kafka-streams to conditionally sort a json input stream
I am not finding a clear way of doing this in processorAPI.
ValidateProcessor.java @Override public void process(String key, String value) { Object result = //validation logic if(result.isSuccessful()) { context().forward(key, value); }else { context.forward("error",Object) } }
Now the caller again need to check and based on key need to differentiate the sink topic. I am using processorAPI because I need use headers.
Edit :
branch(new predicate{ business logic if(condition) return true else return false;
When the condition is false how to push to different stream. Currently creating another predicate which collects all other records which doesn’t satisfy the above predicate in chain. Is there a way to do in same predicate?
Advertisement
Answer
When you specify your Topology
, you assign names to all node and connect them:
Topology topology = new Topology(); topology.addSource("source", ...); topology.addProcessor("X", ..., "source"); // connect source->X topology.addSink("Y", ..., "X"); // connect X->Y topology.addSink("Z", ..., "X"); // connect X->Z
If a processor “X” is connected to downstream processors “Y” and “Z”, you can use the node name to send a record to either “Y” or “Z”. If you don’t specify a name, the record is send to all downstream (“child”) processors.
// this is `process()` of "X" public void process(String key, String value) { context.forward(newKey, newValue); // send to both Y and Z context.forward(newKey, newValue, To.child("Y")); // send it only to Y context.forward(newKey, newValue, To.child("Z")); // send it only to Z }