Skip to content
Advertisement

Apache Beam library upgrade causing IllegalStateExceptions with setRowSchema and setCoder

I’m attempting to upgrade the Apache Beam libraries from v2.19.0 to v2.37.0 (Java 8 & Maven), but have run into an issue with a breaking change that I’d appreciate some support with.  Sorry this is quite a long one, I wanted to capture as much context as I could, but please shout if there’s anything you’d like to dig into.

I’m using Beam inside GCP Dataflow to read data from BigQuery, then processing aggregates before I write the results back to BigQuery.  I’m able to read from/write to BigQuery without issue, but after the upgrade my pipeline to calculate aggregates is failing at runtime, specifically a DoFn I have written to sanitise the results returned from the Beam SqlTransform.query command.  I call this function within ParDo.of to detect Double.MAX_VALUE and Double.MIN_VALUE values, as calling MIN/MAX aggregates in Beam SQL returns the Double min/max values when it encounters a NULL value, rather than just returning NULL.  I did try filtering the initial BigQuery raw data results, but this issue creeps in at the Beam SQL level.

There may be better ways to do this (I’m open to suggestions!).  I’ve included a bunch of code snippets from my pipeline that I’ve tried to simplify, so apologies if there’s anything obviously janky.  Here’s what I previously had before the library upgrade:

PCollection<Row> aggregates = inputCollection.apply(
    "Generate Aggregates",
    SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
)
.apply(ParDo.of(new HandleNullValues()));

I’ve included the HandleNullValues definition at the bottom of this post, but it appears v2.21.0 introduced a breaking change whereby the coder inference was disabled for Beam Row types in this ticket.  This change has caused the above code to fail with the following runtime error:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Unable to return a default Coder for ParDo(HandleNullValues)/ParMultiDo(HandleNullValues).output [PCollection@83398426]. Correct one of the following root causes: [ERROR]   No Coder has been manually specified;  you may do so using .setCoder(). [ERROR]   Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for a Beam Row. Please provide a schema instead using PCollection.setRowSchema. [ERROR]   Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.

I’ve followed the advice on the aforementioned JIRA ticket, plus a bunch of other examples I found online, but without much joy.  I’ve tried applying setCoder(SerializableCoder.of(Row.class)) after the .apply(ParDo.of(new HandleNullValues())) which fixes this error (though I’m not yet sure if it’s just suppressed the error, or if it’s actually working), but that changes causes another runtime error:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Cannot call getSchema when there is no schema -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. Cannot call getSchema when there is no schema

This error is thrown further down my pipeline, when I perform a subsequent SqlTransform.query to JOIN some results together.

PCollectionTuple.of(new TupleTag<Row>("Rows"), aggregates)
                .and(new TupleTag<Row>("Experiments"), experiments)
                    .apply("Joining Aggregates to Experiments", SqlTransform.query(aggregateExperimentJoin()))
                    .apply(ParDo.of(new MapBeamRowsToBigQueryTableRows()))
                    .apply(BigQueryIO.writeTableRows()
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                            .to(NestedValueProvider.of(options.getProjectId(),(SerializableFunction<String, String>) projectId -> projectId + ":daily_aggregates.experiments")));

I’ve verified the aggregates collection is indeed missing a schema if I interrogate the hasSchema property.  The second experiments PCollection above does have a row schema set though:

PCollection<Row> rawExperiments = rows.apply(
    SqlTransform.query("select sessionId, experiments from PCOLLECTION")
);
PCollection<Row> experiments = rawExperiments.apply(ParDo.of(new CustomFunctions.ParseExperiments(bigQuerySchema)));
experiments.setRowSchema(bigQuerySchema);

I’ve also tried applying this coder at the pipeline level, with different variations on the following.  But this also gives the same error:

CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(Row.class, RowCoder.of(bigQuerySchema));
cr.registerCoderForType(TypeDescriptors.rows(), RowCoder.of(bigQuerySchema));

The bigQuerySchema object referenced above is the initial schema used to retrieve all raw data from BigQuery, though that part of the pipeline works fine, so potentially I need to pass the aggregatesSchema object (see below) in to registerCoderForType for the pipeline?

I then tried to set the row schema on aggregates (which was another suggestion in the error above).  I’ve confirmed that calling setCoder is responsible for the previous Row schema disappearing, where it had previously been set by the input PCollection (and also if I call setRowSchema immediately before I call the DoFn.

I’ve simplified the schema for succinctness in this post, but it’s a subset of bigQuerySchema with a few new fields (simple data types).  Here’s what I’ve tried, again with various combinations of where I call setCoder and setRowSchema (before apply() and/or after).

Schema aggregatesSchema = Schema.builder()
    .addNullableField("userId", FieldType.STRING)
    .addNullableField("sessionId", FieldType.STRING)
    .addNullableField("experimentsPerDay", FieldType.INT64)
    .build();

PCollection<Row> aggregates = inputCollection.apply(
    "Generate Aggregates",
    SqlTransform.query(getResourceFileAsString("/sql/generateAggregates.sql"))
)
.apply(ParDo.of(new HandleNullValues()))
.setCoder(SerializableCoder.of(Row.class))
.setRowSchema(aggregatesSchema);

Unfortunately, this causes a third runtime error which I’ve not been able to figure out:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException

The full call stack is at the bottom of this email, and I can see it originating from my HandleNullValues DoFn, but after that it disappears into the Beam libraries.

I’m at a loss as to which route is recommended, and how to proceed, as both coder and schema options are causing different issues.

Any help would be greatly appreciated, and thanks in advance!

The full DoFn I’ve referred to is further below, but it’s worth noting that just having an essentially empty DoFn with both input and output of Beam Row types causes the same issue:

public static class HandleNullValues extends DoFn<Row, Row> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Row row = c.element();
        c.output(row);
    }
}

Here’s the full implementation, if anyone can think of a better way to detect and replace NULL values returned from Beam SQL:

public static class HandleNullValues extends DoFn<Row, Row> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Row row = c.element();
        List<String> fields = row.getSchema().getFieldNames();
        Builder rowBuilder = Row.withSchema(row.getSchema());

        for (String f: fields) {
            Object value = row.getValue(f);
            if (value != null && value instanceof Long) {
                Long longVal = row.getInt64(f);
                if (longVal == Long.MAX_VALUE || longVal == Long.MIN_VALUE) {
                    rowBuilder.addValue(null);
                } else {
                    rowBuilder.addValue(value);
                }
            } else if (value != null && value instanceof Double) {
                Double doubleVal = row.getDouble(f);
                if (doubleVal == Double.MAX_VALUE || doubleVal == Double.MIN_VALUE) {
                    rowBuilder.addValue(null);
                } else {
                    rowBuilder.addValue(value);
                }
            } else {
                rowBuilder.addValue(value);
            }
        }

        Row newRow = rowBuilder.build();
        c.output(newRow);
    }
}

And here’s the full callstack from the setRowSchema issue detailed above:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project dataflow-example: An exception occured while executing the Java class. java.lang.IllegalStateException     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:306)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157)     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121)     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)     at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127)     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke (Method.java:498)     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282)     at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225)     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406)     at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.maven.plugin.MojoExecutionException: An exception occured while executing the Java class. java.lang.IllegalStateException     at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:311)     at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)     at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:301)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:211)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:165)     at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:157)     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:121)     at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)     at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)     at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:127)     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:294)     at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)     at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)     at org.apache.maven.cli.MavenCli.execute (MavenCli.java:960)     at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:293)     at org.apache.maven.cli.MavenCli.main (MavenCli.java:196)     at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke (Method.java:498)     at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:282)     at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:225)     at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:406)     at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:347) Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:373)     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:341)     at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:218)     at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:67)     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)     at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)     at com.example.dataflow.Pipeline.main (Pipeline.java:284)     at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)     at java.lang.Thread.run (Thread.java:748) Caused by: java.lang.IllegalStateException     at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState (Preconditions.java:491)     at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate (RowCoderGenerator.java:314)     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode (Unknown Source)     at org.apache.beam.sdk.coders.Coder$ByteBuddy$t7ZQOyQd.encode (Unknown Source)     at org.apache.beam.sdk.schemas.SchemaCoder.encode (SchemaCoder.java:124)     at org.apache.beam.sdk.coders.Coder.encode (Coder.java:136)     at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream (CoderUtils.java:85)     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:69)     at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray (CoderUtils.java:54)     at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:144)     at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector. (MutationDetectors.java:118)     at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder (MutationDetectors.java:49)     at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add (ImmutabilityCheckingBundleFactory.java:115)     at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output (ParDoEvaluator.java:305)     at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue (SimpleDoFnRunner.java:268)     at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900 (SimpleDoFnRunner.java:84)     at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:416)     at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output (SimpleDoFnRunner.java:404)     at com.example.dataflow.Pipeline$HandleNullValues.processElement (CustomFunctions.java:310)

Cheers!

Advertisement

Answer

For anyone wanting to know how I fixed this, there are several updates:

  1. Apache Beam v2.20+ introduced breaking changes to inferred schema and coder, meaning setRowSchema(x) must now be declared immediately after applying a ParDo function
  2. The HandleNullValues function above is no longer needed, as a bug fix was issued for this erroneous behaviour in Apache Beam v2.29.0 with NULL values being returned for empty aggregates
  3. I did not have to override a coder myself, validating getSchema at each stage of the pipeline and setting it when required, was all that I needed to check

I’d thoroughly recommend signing-up to the Apache Beam mailing list if you require help, they’ve been great.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement