Skip to content
Advertisement

Issue with Spark Big Query Connector with Java

Getting Below issue with the Spark Big Query connector in Dataproc cluster with below configuraton. Image: 1.5.21-debian10 Spark Version: 2.4.7 Scala Version: 2.12.10

This is working fine from local but failing when I deploy this in dataproc cluster.Can someone suggest some pointers for this issue?

20/11/05 11:30:42 INFO com.google.cloud.spark.bigquery.BigQueryUtilScala: BigQuery client project id is [<PROJECT_ID}], derived from the parentProject option
20/11/05 11:30:44 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Querying table <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>, parameters sent from Spark: requiredColumns=[postal_code,src_id,fname,mob,PARTY_ID,tenant_id,city,lname,country,ssn], filters=[]
20/11/05 11:30:44 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Going to read from <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME> columns=[postal_code, src_id, fname, mob, PARTY_ID, tenant_id, city, lname, country, ssn], filter=''
20/11/05 11:30:45 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Created read session for table '<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>': projects/<PROJECT_ID>/locations/us/sessions/<SESSION_ID>
20/11/05 11:30:50 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, executor 2): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101)
    at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
    at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

20/11/05 11:30:55 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, executor 1): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101)
    at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
    at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1926)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1914)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1913)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1913)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:948)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2147)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2096)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2085)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
    at com.gcp.poc.SparkBigQueryConnector.main(SparkBigQueryConnector.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101)
    at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116)
    at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

pom.xml:

  <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-shared-dependencies</artifactId>
        <version>0.13.0</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-bigquery</artifactId>
        <version>1.116.10</version>
        <exclusions>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava-jdk5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>failureaccess</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>listenablefuture</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>com.google.cloud.spark</groupId>
        <artifactId>spark-bigquery_2.12</artifactId>
        <version>0.17.3</version>
        <exclusions>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava-jdk5</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>failureaccess</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>listenablefuture</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>30.0-jre</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-reflect</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-storage</artifactId>
        <version>1.113.1</version>
        <exclusions>
            <exclusion>
                <artifactId>guava</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>failureaccess</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>listenablefuture</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-spark</artifactId>
        <version>2.0.2.3.1.0.0-78</version>
    </dependency>
    <dependency>
        <groupId>com.google.cloud.bigtable</groupId>
        <artifactId>bigtable-hbase-1.x-hadoop</artifactId>
        <version>1.16.0</version>
        <exclusions>
            <exclusion>
                <artifactId>guava</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>failureaccess</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>listenablefuture</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
        </exclusions>
    </dependency>

Here is the sample Code:

public static void main(String[] args) {
        SparkSession session = SparkSession.builder().getOrCreate();
        Dataset<Row> readDS = session.read().format("bigquery")
                .option("table", "<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>")
                .option("project", projectId)
                .option("parentProject", projectId)
                .load();
        
        readDS.show(1,false);
    }

Advertisement

Answer

Can you please replace the Spark BigQuery connector to the shaded one?

<dependency>
  <groupId>com.google.cloud.spark</groupId>
  <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
  <version>0.17.3</version>
</dependency>
Advertisement