Skip to content
Advertisement

Beam – Error while branching PCollections

I have a pipeline that reads data from kafka. It splits the incoming data into processing and rejected outputs. Data from Kafka is read into custom class MyData and output is produced as KV<byte[], byte[]>

Define two TupleTags with MyData.

 private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){};
 private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};

InvalidDataDoFn has application logic that splits MyData data into processing and rejected

InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag); 
PCollectionTuple mixedCollection = myCollection
    .apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag)));


OutputDoFn outputDoFn = new outputDoFn();

PCollection<MyData> processingCollection = mixedCollection.get(processingTag);

PCollection<KV<byte[], byte[]>> outputCollection = processingCollection
  .apply("ProcessElements", ParDo.of(outputDoFn));

OutputDoFn converts MyData into KV<byte[], byte[]>. While running OutputDoFn, I get a weird error stating that “Tag passed to output cannot be null” – This is from https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L559

My OutputDoFn has the following logic.

@ProcessElement
public void processElement(@Element MyData mydata,
    OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) {

  c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes()));
}

Advertisement

Answer

Correct me if I’m wrong, but you’d like to use this c.output :

public void output(OutputT output)

and you’re suprised that this function is used :

public <T> void output(TupleTag<T> tag, T output)

For Beam to use the first one, the argument you’re passing must have the OutputT type declared at your DoFn creation :

private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext

My guess here would be that the value you pass to c.output() is not exactly the type you specified when creating your DoFn. Therefore, the second output function is chosen and it misses the tag.

Could you give the full DoFn declaration for OutputDoFn to confirm ?

All code references from here.

User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement