I would like to use Apache Beam Java with the recently published Firestore connector to add new documents to a Firestore collection. While I thought that this should be a relatively easy task, the need for creating com.google.firestore.v1.Document
objects seem to make things a bit more difficult. I was using this blog post on Using Firestore and Apache Beam for data processing as a starting point.
What I actually only want is to write is a simple transformation, mapping MyClass
objects to Firestore documents, which are then added to a Firestore collection.
What I now ended up with is a Beam SimpleFunction
, which maps MyClass
objects to Documents
:
public static class Mapper extends SimpleFunction<MyClass, Document> { @Override public Document apply(final MyClass record) { final String project = "my-project"; final String database = "(default)"; final String collection = "my-collection"; final String documentId = someUnecessaryIdComputation(); return Document .newBuilder() .setName("projects/" + project + "/databases/" + database + "/documents/" + collection + "/" + documentId) .putFields("key", Value.newBuilder().setStringValue(record.getValue()).build()) // ... .build(); } }
and a DoFn
transforming these Document
s to Write
objects with configured update
(can probably be also simplified to a SimpleFunction
but was copied from the blog post):
private static final class CreateUpdateOperation extends DoFn<Document, Write> { @ProcessElement public void processElement(ProcessContext c) { final Write write = Write.newBuilder() .setUpdate(c.element()) .build(); c.output(write); } }
I’m using these two functions in my pipeline as follows:
pipeline.apply(MapElements.via(new Mapper())) .apply(ParDo.of(new CreateUpdateOperation())) .apply(FirestoreIO.v1().write().batchWrite().build());
The major disadvantages here are:
- I have to specify a document ID and can not use an auto-generated one as with the “plain” Java SDK
- I have to specify the project ID and the database name although they should be available. At least for the Java SDK, I have don’t have to set them.
Is there any way to add documents using the Firestore connector without explicitly setting document ID, project ID and database?
Advertisement
Answer
I agree, this is not the most convenient API (and I don’t see a better one at the moment). It seems to be designed for modifying existing documents, not creating new ones.
I think it would make sense to have a higher-level transform; I filed https://issues.apache.org/jira/browse/BEAM-13994 . In the meantime, you could do something like
class FirestoreWrite extends PTransform<PCollection<Map<String, Object>>, PDone> { private String projectId; // Auto-infer from environment private String database = "(defaut)"; private String collection; public PDone expand(PCollection<Map<String, Object>> data) { return data .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { builder = Document .newBuilder() .setName("projects/" + projectId + "/databases/" + database + "/documents/" + collection + "/" + randomDocumentId()); // [loop over data setting values from c.element()] c.output(builder.build()); } })) .apply(new CreateUpdateOperation()) .apply(FirestoreIO.v1().write().batchWrite().build()); } }
which would be generally re-usable and likely worth contributing to Beam.