I would like to use Apache Beam Java with the recently published Firestore connector to add new documents to a Firestore collection. While I thought that this should be a relatively easy task, the need for creating com.google.firestore.v1.Document
objects seem to make things a bit more difficult. I was using this blog post on Using Firestore and Apache Beam for data processing as a starting point.
What I actually only want is to write is a simple transformation, mapping MyClass
objects to Firestore documents, which are then added to a Firestore collection.
What I now ended up with is a Beam SimpleFunction
, which maps MyClass
objects to Documents
:
public static class Mapper extends SimpleFunction<MyClass, Document> {
@Override
public Document apply(final MyClass record) {
final String project = "my-project";
final String database = "(default)";
final String collection = "my-collection";
final String documentId = someUnecessaryIdComputation();
return Document
.newBuilder()
.setName("projects/" + project + "/databases/" + database + "/documents/" + collection
+ "/" + documentId)
.putFields("key",
Value.newBuilder().setStringValue(record.getValue()).build())
// ...
.build();
}
}
and a DoFn
transforming these Document
s to Write
objects with configured update
(can probably be also simplified to a SimpleFunction
but was copied from the blog post):
private static final class CreateUpdateOperation extends DoFn<Document, Write> {
@ProcessElement
public void processElement(ProcessContext c) {
final Write write = Write.newBuilder()
.setUpdate(c.element())
.build();
c.output(write);
}
}
I’m using these two functions in my pipeline as follows:
pipeline.apply(MapElements.via(new Mapper()))
.apply(ParDo.of(new CreateUpdateOperation()))
.apply(FirestoreIO.v1().write().batchWrite().build());
The major disadvantages here are:
- I have to specify a document ID and can not use an auto-generated one as with the “plain” Java SDK
- I have to specify the project ID and the database name although they should be available. At least for the Java SDK, I have don’t have to set them.
Is there any way to add documents using the Firestore connector without explicitly setting document ID, project ID and database?
Advertisement
Answer
I agree, this is not the most convenient API (and I don’t see a better one at the moment). It seems to be designed for modifying existing documents, not creating new ones.
I think it would make sense to have a higher-level transform; I filed https://issues.apache.org/jira/browse/BEAM-13994 . In the meantime, you could do something like
class FirestoreWrite extends PTransform<PCollection<Map<String, Object>>, PDone> {
private String projectId; // Auto-infer from environment
private String database = "(defaut)";
private String collection;
public PDone expand(PCollection<Map<String, Object>> data) {
return data
.apply(ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
builder = Document
.newBuilder()
.setName("projects/" + projectId + "/databases/" + database + "/documents/" + collection + "/" + randomDocumentId());
// [loop over data setting values from c.element()]
c.output(builder.build());
}
}))
.apply(new CreateUpdateOperation())
.apply(FirestoreIO.v1().write().batchWrite().build());
}
}
which would be generally re-usable and likely worth contributing to Beam.