I’m teaching myself Apache Beam, specifically for using in parsing JSON. I was able to create a simple example that parsed JSON to a POJO and POJO to CSV. It required that I use .setCoder()
for my simple POJO class.
pipeline .apply("Read source JSON file.", TextIO.read().from(options.getInput())) .apply("Parse to POJO matching schema", ParseJsons.of(Person.class)) .setCoder(SerializableCoder.of(Person.class)) .apply("Create comma delimited string", new PersonToCsvRow()) .apply("Write out to file", TextIO.write().to(options.getOutput()) .withoutSharding());
The problem
Now I am trying to skip the POJO step of parsing using some custom transforms. My pipeline looks like this:
pipeline .apply("Read Json", TextIO.read().from("src/main/resources/family_tree.json")) .apply("Traverse Json tree", new JSONTreeToPaths()) .apply("Format tree paths", new PathsToCSV()) .apply("Write to CSV", TextIO.write().to("src/main/resources/paths.csv") .withoutSharding());
This pipeline is supposed to take a heavily nested JSON structure and print each individual path through the tree. I’m getting the same error I did in the POJO example above:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@331122245]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder().
What I tried
So I tried to add a coder in a few different ways:
.setCoder(SerializableCoder.of(List<String>.class))
Results in “Cannot select from parameterized type”. I found another instance of this error generated by a different use case here, but the accepted answer seemed only be applicable to that use case.
So then I started perusing the Beam docs and found ListCoder.of()
which has (literally) no description. But it looked promising, so I tried it:
.setCoder(ListCoder.of(SerializableCoder.of(String.class)))
But this takes me back to the initial error of not having manually set a coder.
The question
How do I satisfy this requirement to set a coder for a List<String>
object?
Code
The transform that is causing the setCoder
error is this one:
package transforms; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import java.util.ArrayList; import java.util.List; public class JSONTreeToPaths extends PTransform<PCollection<String>, PCollection<List<String>>> { public static class ExtractPathsFromTree extends SimpleFunction<JsonNode, List<String>> { public List<String> apply(JsonNode root) { List<String> pathContainer = new ArrayList<>(); getPaths(root, "", pathContainer); return pathContainer; } } public static class GetRootNode extends SimpleFunction<String, JsonNode> { public JsonNode apply(String jsonString) { try { return getRoot(jsonString); } catch (JsonProcessingException e) { e.printStackTrace(); return null; } } } @Override public PCollection<List<String>> expand(PCollection<String> input) { return input .apply(MapElements.via(new GetRootNode())) .apply(MapElements.via(new ExtractPathsFromTree())); } private static JsonNode getRoot(String jsonString) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); return mapper.readTree(jsonString); } private static void getPaths(JsonNode node, String currentPath, List<String> paths) { //check if leaf: if (node.path("children").isMissingNode()) { currentPath += node.get("Id"); paths.add(currentPath); System.out.println(currentPath); return; } // recursively iterate over children currentPath += (node.get("Id") + ","); for (JsonNode child : node.get("children")) { getPaths(child, currentPath, paths); } } }
Advertisement
Answer
While the error message seems to imply that the list of strings is what needs encoding, it is actually the JsonNode
. I just had to read a little further down in the error message, as the opening statement is a bit deceiving as to where the issue is:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@1324829744]. ... ... Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.fasterxml.jackson.databind.JsonNode. Building a Coder using a registered CoderProvider failed.
Once I discovered this, I solved the problem by extending Beam’s CustomCoder
class. This abstract class is nice because you only have to write the code to serialize and deserialize the object:
public class JsonNodeCoder extends CustomCoder<JsonNode> { @Override public void encode(JsonNode node, OutputStream outStream) throws IOException { ObjectMapper mapper = new ObjectMapper(); String nodeString = mapper.writeValueAsString(node); outStream.write(nodeString.getBytes()); } @Override public JsonNode decode(InputStream inStream) throws IOException { byte[] bytes = IOUtils.toByteArray(inStream); ObjectMapper mapper = new ObjectMapper(); String json = new String(bytes); return mapper.readTree(json); } }
Hopes this helps some other Beam newbie out there.