Skip to content
Advertisement

Add document to Firestore from Beam with auto generated ID

I would like to use Apache Beam Java with the recently published Firestore connector to add new documents to a Firestore collection. While I thought that this should be a relatively easy task, the need for creating com.google.firestore.v1.Document objects seem to make things a bit more difficult. I was using this blog post on Using Firestore and Apache Beam for data processing as a starting point.

What I actually only want is to write is a simple transformation, mapping MyClass objects to Firestore documents, which are then added to a Firestore collection.

What I now ended up with is a Beam SimpleFunction, which maps MyClass objects to Documents:

public static class Mapper extends SimpleFunction<MyClass, Document> {

    @Override
    public Document apply(final MyClass record) {
      final String project = "my-project";
      final String database = "(default)";
      final String collection = "my-collection";
      final String documentId = someUnecessaryIdComputation();
      return Document
          .newBuilder()
          .setName("projects/" + project + "/databases/" + database + "/documents/" + collection
              + "/" + documentId)
          .putFields("key",
              Value.newBuilder().setStringValue(record.getValue()).build())
          // ...
          .build();
    }

  }

and a DoFn transforming these Documents to Write objects with configured update (can probably be also simplified to a SimpleFunction but was copied from the blog post):

private static final class CreateUpdateOperation extends DoFn<Document, Write> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      final Write write = Write.newBuilder()
          .setUpdate(c.element())
          .build();
      c.output(write);
    }
  }

I’m using these two functions in my pipeline as follows:

pipeline.apply(MapElements.via(new Mapper()))
  .apply(ParDo.of(new CreateUpdateOperation()))
  .apply(FirestoreIO.v1().write().batchWrite().build());

The major disadvantages here are:

  • I have to specify a document ID and can not use an auto-generated one as with the “plain” Java SDK
  • I have to specify the project ID and the database name although they should be available. At least for the Java SDK, I have don’t have to set them.

Is there any way to add documents using the Firestore connector without explicitly setting document ID, project ID and database?

Advertisement

Answer

I agree, this is not the most convenient API (and I don’t see a better one at the moment). It seems to be designed for modifying existing documents, not creating new ones.

I think it would make sense to have a higher-level transform; I filed https://issues.apache.org/jira/browse/BEAM-13994 . In the meantime, you could do something like

class FirestoreWrite extends PTransform<PCollection<Map<String, Object>>, PDone> {
  private String projectId;  // Auto-infer from environment
  private String database = "(defaut)";
  private String collection;
  
  public PDone expand(PCollection<Map<String, Object>> data) {
    return data
        .apply(ParDo.of(new DoFn() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            builder = Document
                .newBuilder()
                .setName("projects/" + projectId + "/databases/" + database + "/documents/" + collection + "/" + randomDocumentId());
            // [loop over data setting values from c.element()]
            c.output(builder.build());
          }
        }))
        .apply(new CreateUpdateOperation())
        .apply(FirestoreIO.v1().write().batchWrite().build());
  }
}

which would be generally re-usable and likely worth contributing to Beam.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement