Skip to content
Advertisement

Kafka Streams processor API context.forward

For incoming record I need to validate the value and based on result object I need to forward error to different topics and if successfully validated then forward the same using context.forward(). It can be done using DSL as provided in this link

using kafka-streams to conditionally sort a json input stream

I am not finding a clear way of doing this in processorAPI.

    ValidateProcessor.java

    @Override
    public void process(String key, String value) {
        Object result = //validation logic
        if(result.isSuccessful()) {
            context().forward(key, value);
         }else {
            context.forward("error",Object)
        }

}

Now the caller again need to check and based on key need to differentiate the sink topic. I am using processorAPI because I need use headers.

Edit :

branch(new predicate{
 business logic 
 if(condition)
   return true
 else
   return false;

When the condition is false how to push to different stream. Currently creating another predicate which collects all other records which doesn’t satisfy the above predicate in chain. Is there a way to do in same predicate?

Advertisement

Answer

When you specify your Topology, you assign names to all node and connect them:

Topology topology = new Topology();
topology.addSource("source", ...);
topology.addProcessor("X", ..., "source"); // connect source->X
topology.addSink("Y", ..., "X"); // connect X->Y
topology.addSink("Z", ..., "X"); // connect X->Z

If a processor “X” is connected to downstream processors “Y” and “Z”, you can use the node name to send a record to either “Y” or “Z”. If you don’t specify a name, the record is send to all downstream (“child”) processors.

// this is `process()` of "X"
public void process(String key, String value) {
    context.forward(newKey, newValue); // send to both Y and Z
    context.forward(newKey, newValue, To.child("Y")); // send it only to Y
    context.forward(newKey, newValue, To.child("Z")); // send it only to Z
}
Advertisement