Skip to content
Advertisement

Apache Beam Split to Multiple Pipeline Output

I am subscribing from one topic and contains different event types and they pass in with different attributes.

After I read the element, based on their attribute, I need to move them to different places. This is the sample code look like:

    Options options =PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);
    pipeline
    .apply(
        "ReadType1",
        EventIO.<T>readJsons()
            .of(T.class)
            .withPubsubTimestampAttributeName(null)
            .withOptions(options))
    .apply(
        Filter.by(
            new SerializableFunction<T, Boolean>() {
              @Override
              public Boolean apply(T input) {
                return input.attributes.get("type").equals("type1");
              }
            }))
    .apply(
        "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
    .apply("AsJsons", AsJsons.of(T.class))
    .apply(
        "Write File(s)",
        TextIO.write()
            .withWindowedWrites()
            .withNumShards(options.getNumShards())
            .to(
                new WindowedFilenamePolicy(
                    options.getRunOutputDirectory(),
                    options.getUseCurrentDateForOutputDirectory(),
                    options.getOutputFilenamePrefix(),
                    options.getOutputShardTemplate(),
                    options.getOutputFilenameSuffix()))
            .withTempDirectory(
                NestedValueProvider.of(
                    options.getTempDirectory(),
                    (SerializableFunction<String, ResourceId>)
                        input -> FileBasedSink.convertToFileResourceIfPossible(input))));


pipeline.apply("ReadType2",
        EventIO.<T>readJsons().of(T.class)
                .withPubsubTimestampAttributeName(null)
                .withOptions(options))
        .apply(Filter.by(new SerializableFunction<T, Boolean>() {
          @Override
          public Boolean apply(Event input) {
            return input.attributes.get("type").equals("type2");
          }
        })).apply( "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
        .apply("AsJsons", AsJsons.of(T.class))
        .apply(
                "Write File(s)",
                TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(options.getNumShards())
                        .to(
                                new WindowedFilenamePolicy(
                                        options.getBatchOutputDirectory(),
                                        options.getUseCurrentDateForOutputDirectory(),
                                        options.getOutputFilenamePrefix(),
                                        options.getOutputShardTemplate(),
                                        options.getOutputFilenameSuffix()))
                        .withTempDirectory(
                                NestedValueProvider.of(
                                        options.getTempDirectory(),
                                        (SerializableFunction<String, ResourceId>)
                                                input -> FileBasedSink.convertToFileResourceIfPossible(input))));

pipeline.apply("ReadType3",
        EventIO.<Event>readJsons().of(T.class)
                .withPubsubTimestampAttributeName(null)
                .withOptions(options))
        .apply(Filter.by(new SerializableFunction<T, Boolean>() {
          @Override
          public Boolean apply(T input) {
            return input.attributes.get("type").equals("type3");
          }
        })).apply( "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
        .apply("AsJsons", AsJsons.of(T.class))
        .apply(
                "Write File(s)",
                TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(options.getNumShards())
                        .to(
                                new WindowedFilenamePolicy(
                                        options.getCustomIntervalOutputDirectory(),
                                        options.getUseCurrentDateForOutputDirectory(),
                                        options.getOutputFilenamePrefix(),
                                        options.getOutputShardTemplate(),
                                        options.getOutputFilenameSuffix()))
                        .withTempDirectory(
                                NestedValueProvider.of(
                                        options.getTempDirectory(),
                                        (SerializableFunction<String, ResourceId>)
                                                input -> FileBasedSink.convertToFileResourceIfPossible(input))));

pipeline.run();

Basically I read an event and filter them on their attribute and write the file. The job failed in dataflow as Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'sub-name' is consumed by multiple stages, this will result in undefined behavior.

So what will be the appropriate way to split the pipeline within the same job?

I tried Pipeline1, Pipeline2,Pipeline3 and it end up need to multiple job name to run multiple pipeline, I am not sure that should the right way to do it.

Advertisement

Answer

The two EventIO transforms on the same subscription are the cause of the error. You need to eliminate one of those transforms in order for this to work. This can be done by consuming the subscription into a single PCollection and then applying two filtering branches to that collection individually. Here is a partial example:

// single PCollection of the events consumed from the subscription
PCollection<T> events = pipeline
  .apply("Read Events",
    EventIO.<T>readJsons()
      .of(T.class)
      .withPubsubTimestampAttributeName(null)
      .withOptions(options));

// PCollection of type1 events
PCollection<T> typeOneEvents = events.apply(
  Filter.by(
    new SerializableFunction<T, Boolean>() {
      @Override
      public Boolean apply(T input) {
        return input.attributes.get("type").equals("type1");
      }}));
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")

// PCollection of type2 events
PCollection<T> typeTwoEvents = events.apply(
  Filter.by(
    new SerializableFunction<T, Boolean>() {
      @Override
      public Boolean apply(T input) {
        return input.attributes.get("type").equals("type2");
      }}));
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")

Another possibility is to use some other transforms provided by Apache Beam. Doing so might simplify your solution a little. Once such transform is Partition. Partition allows for the splitting of a single PCollection in a fixed number of PCollections based on a partitioning function. A partial example using Partition is:

// single PCollection of the events consumed from the subscription
PCollectionList<T> eventsByType = pipeline
  .apply("Read Events",
    EventIO.<T>readJsons()
      .of(T.class)
      .withPubsubTimestampAttributeName(null)
      .withOptions(options))
  .apply("Partition By Type",
    Partition.of(2, new PartitionFn<T>() {
      public int partitionFor(T event, int numPartitions) {
        return input.attributes.get("type").equals("type1") ? 0 : 1;
      }}));

PCollection<T> typeOneEvents = eventsByType.get(0);
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")

PCollection<T> typeTwoEvents = eventsByType.get(1);
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement