Skip to content
Advertisement

How do I set the coder for a PCollection<List> in Apache Beam?

I’m teaching myself Apache Beam, specifically for using in parsing JSON. I was able to create a simple example that parsed JSON to a POJO and POJO to CSV. It required that I use .setCoder() for my simple POJO class.

        pipeline
            .apply("Read source JSON file.", TextIO.read().from(options.getInput()))
            .apply("Parse to POJO matching schema", ParseJsons.of(Person.class))
            .setCoder(SerializableCoder.of(Person.class))
            .apply("Create comma delimited string", new PersonToCsvRow())
            .apply("Write out to file", TextIO.write().to(options.getOutput())
                .withoutSharding());

The problem

Now I am trying to skip the POJO step of parsing using some custom transforms. My pipeline looks like this:

        pipeline
            .apply("Read Json", TextIO.read().from("src/main/resources/family_tree.json"))
            .apply("Traverse Json tree", new JSONTreeToPaths())
            .apply("Format tree paths", new PathsToCSV())
            .apply("Write to CSV", TextIO.write().to("src/main/resources/paths.csv")
                .withoutSharding());

This pipeline is supposed to take a heavily nested JSON structure and print each individual path through the tree. I’m getting the same error I did in the POJO example above:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@331122245]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().

What I tried

So I tried to add a coder in a few different ways:

.setCoder(SerializableCoder.of(List<String>.class))

Results in “Cannot select from parameterized type”. I found another instance of this error generated by a different use case here, but the accepted answer seemed only be applicable to that use case.

So then I started perusing the Beam docs and found ListCoder.of() which has (literally) no description. But it looked promising, so I tried it:

.setCoder(ListCoder.of(SerializableCoder.of(String.class)))

But this takes me back to the initial error of not having manually set a coder.

The question

How do I satisfy this requirement to set a coder for a List<String> object?

Code

The transform that is causing the setCoder error is this one:

package transforms;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

import java.util.ArrayList;
import java.util.List;

public class JSONTreeToPaths extends PTransform<PCollection<String>, PCollection<List<String>>> {

    public static class ExtractPathsFromTree extends SimpleFunction<JsonNode, List<String>> {
        public List<String> apply(JsonNode root) {
            List<String> pathContainer = new ArrayList<>();
            getPaths(root, "", pathContainer);
            return pathContainer;
        }
    }

    public static class GetRootNode extends SimpleFunction<String, JsonNode> {
        public JsonNode apply(String jsonString) {
            try {
                return getRoot(jsonString);
            } catch (JsonProcessingException e) {
               e.printStackTrace();
               return null;
            }
        }
    }

    @Override
    public PCollection<List<String>> expand(PCollection<String> input) {
        return input
            .apply(MapElements.via(new GetRootNode()))
            .apply(MapElements.via(new ExtractPathsFromTree()));
    }

    private static JsonNode getRoot(String jsonString) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readTree(jsonString);
    }

    private static void getPaths(JsonNode node, String currentPath, List<String> paths) {
        //check if leaf:
        if (node.path("children").isMissingNode()) {
            currentPath += node.get("Id");
            paths.add(currentPath);
            System.out.println(currentPath);
            return;
        }

        // recursively iterate over children
        currentPath += (node.get("Id") + ",");
        for (JsonNode child : node.get("children")) {
            getPaths(child, currentPath, paths);
        }
    }
}



Advertisement

Answer

While the error message seems to imply that the list of strings is what needs encoding, it is actually the JsonNode. I just had to read a little further down in the error message, as the opening statement is a bit deceiving as to where the issue is:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@1324829744]. 
...
...
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder 
for com.fasterxml.jackson.databind.JsonNode.
Building a Coder using a registered CoderProvider failed.

Once I discovered this, I solved the problem by extending Beam’s CustomCoder class. This abstract class is nice because you only have to write the code to serialize and deserialize the object:

public class JsonNodeCoder extends CustomCoder<JsonNode> {

    @Override
    public void encode(JsonNode node, OutputStream outStream) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        String nodeString = mapper.writeValueAsString(node);
        outStream.write(nodeString.getBytes());
    }

    @Override
    public JsonNode decode(InputStream inStream) throws IOException {
        byte[] bytes = IOUtils.toByteArray(inStream);
        ObjectMapper mapper = new ObjectMapper();
        String json = new String(bytes);
        return mapper.readTree(json);
    }
}

Hopes this helps some other Beam newbie out there.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement