Getting Below issue with the Spark Big Query connector in Dataproc cluster with below configuraton. Image: 1.5.21-debian10 Spark Version: 2.4.7 Scala Version: 2.12.10
This is working fine from local but failing when I deploy this in dataproc cluster.Can someone suggest some pointers for this issue?
20/11/05 11:30:42 INFO com.google.cloud.spark.bigquery.BigQueryUtilScala: BigQuery client project id is [<PROJECT_ID}], derived from the parentProject option 20/11/05 11:30:44 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Querying table <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>, parameters sent from Spark: requiredColumns=[postal_code,src_id,fname,mob,PARTY_ID,tenant_id,city,lname,country,ssn], filters=[] 20/11/05 11:30:44 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Going to read from <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME> columns=[postal_code, src_id, fname, mob, PARTY_ID, tenant_id, city, lname, country, ssn], filter='' 20/11/05 11:30:45 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: Created read session for table '<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>': projects/<PROJECT_ID>/locations/us/sessions/<SESSION_ID> 20/11/05 11:30:50 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, executor 2): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211) at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101) at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116) at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/11/05 11:30:55 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, executor 1): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211) at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101) at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116) at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1926) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1914) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1913) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1913) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:948) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2147) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2096) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2085) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2550) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370) at org.apache.spark.sql.Dataset.head(Dataset.scala:2550) at org.apache.spark.sql.Dataset.take(Dataset.scala:2764) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) at org.apache.spark.sql.Dataset.show(Dataset.scala:753) at com.gcp.poc.SparkBigQueryConnector.main(SparkBigQueryConnector.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.prepareLoadNextBatch(ArrowReader.java:211) at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:101) at com.google.cloud.spark.bigquery.ArrowReaderIterator.hasNext(ArrowBinaryIterator.java:116) at com.google.cloud.spark.bigquery.ArrowBinaryIterator.hasNext(ArrowBinaryIterator.java:66) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:858) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:858) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
pom.xml:
<dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-shared-dependencies</artifactId> <version>0.13.0</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery</artifactId> <version>1.116.10</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>failureaccess</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>listenablefuture</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.cloud.spark</groupId> <artifactId>spark-bigquery_2.12</artifactId> <version>0.17.3</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava-jdk5</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>failureaccess</artifactId> </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>listenablefuture</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.0-jre</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-storage</artifactId> <version>1.113.1</version> <exclusions> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>failureaccess</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>listenablefuture</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-spark</artifactId> <version>2.0.2.3.1.0.0-78</version> </dependency> <dependency> <groupId>com.google.cloud.bigtable</groupId> <artifactId>bigtable-hbase-1.x-hadoop</artifactId> <version>1.16.0</version> <exclusions> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>failureaccess</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>listenablefuture</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency>
Here is the sample Code:
public static void main(String[] args) { SparkSession session = SparkSession.builder().getOrCreate(); Dataset<Row> readDS = session.read().format("bigquery") .option("table", "<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>") .option("project", projectId) .option("parentProject", projectId) .load(); readDS.show(1,false); }
Advertisement
Answer
Can you please replace the Spark BigQuery connector to the shaded one?
<dependency> <groupId>com.google.cloud.spark</groupId> <artifactId>spark-bigquery-with-dependencies_2.12</artifactId> <version>0.17.3</version> </dependency>