I have a pipeline that reads data from kafka. It splits the incoming data into processing and rejected outputs. Data from Kafka is read into custom class MyData and output is produced as KV<byte[], byte[]>
Define two TupleTags with MyData.
private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){}; private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};
InvalidDataDoFn has application logic that splits MyData data into processing and rejected
InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag); PCollectionTuple mixedCollection = myCollection .apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag))); OutputDoFn outputDoFn = new outputDoFn(); PCollection<MyData> processingCollection = mixedCollection.get(processingTag); PCollection<KV<byte[], byte[]>> outputCollection = processingCollection .apply("ProcessElements", ParDo.of(outputDoFn));
OutputDoFn converts MyData into KV<byte[], byte[]>. While running OutputDoFn, I get a weird error stating that “Tag passed to output cannot be null” – This is from https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L559
My OutputDoFn has the following logic.
@ProcessElement public void processElement(@Element MyData mydata, OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) { c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes())); }
Advertisement
Answer
Correct me if I’m wrong, but you’d like to use this c.output
:
public void output(OutputT output)
and you’re suprised that this function is used :
public <T> void output(TupleTag<T> tag, T output)
For Beam to use the first one, the argument you’re passing must have the OutputT
type declared at your DoFn
creation :
private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
My guess here would be that the value you pass to c.output()
is not exactly the type you specified when creating your DoFn. Therefore, the second output
function is chosen and it misses the tag.
Could you give the full DoFn declaration for OutputDoFn
to confirm ?
All code references from here.