I’m trying to make unit testing in Dataflow.
For that test, at the begging, I will start with a simple hardcoded string.
The problem is that I would need to transform that string to a pubsub message. I got the following code to do that:
// Create a PCollection from string a transform to pubsub message format PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" + "")) .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() { @ProcessElement public void processElement(ProcessContext c) { c.output(new PubsubMessage(c.element().getBytes(), null)); } }));
But I get the following error:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}} at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55) <...> Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51) ... 50 more
How I should create the pubsub message from the string?
Advertisement
Answer
In the Beam Programming Guide under Serializability requirements for user ParDos, it mentions this:
Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.
What’s happening is that your anonymous DoFn implicitly contains a pointer to the class you’re constructing the pipeline in, which is leading to this serialization failure. You can avoid this by making your DoFn a named subclass instead of anonymous:
public class MyDoFn extends DoFn<String, PubsubMessage>() { @ProcessElement public void processElement(ProcessContext c) { c.output(new PubsubMessage(c.element().getBytes(), null)); } }