Spark 2.0.0 : Read from Cassandra in Cluster mode

Tags: , , ,



I have some troubles running a Spark Application that reads data from Cassandra in Spark 2.0.0.

My code work as follow :

DataFrameReader readerCassandra = SparkContextUtil.getInstance().read() 
                    .format("org.apache.spark.sql.cassandra")
                    .option("spark.cassandra.connection.host", [DATABASE_IP])
                    .option("spark.cassandra.connection.port", [DATABASE_PORT]);

final Map<String,String> map = new HashMap<String,String>();

map.put("table", "MyTable");
map.put("keyspace", "MyKeyspace");

public final  StructType schema = DataTypes.createStructType(
        new StructField[] { DataTypes.createStructField("id", DataTypes.StringType, true),
            DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
            DataTypes.createStructField("value", DataTypes.DoubleType, true)
        });

final Dataset<Row> dataset = readerCassandra.schema(schema).options(map).load(); 
dataset.show(false);

I want to run this code in a cluster. My cluster use spark-2.0.2-bin-hadoop2.7 (No spark-2.0.0 available at http://spark.apache.org/downloads.html).

At first, I submit it in Client mode with the following script :

#!/bin/bash

sparkMaster=local[*]
mainClass=package.MainClass

jar=/path/to/myJar-with-dependencies.jar

driverPort=7079
blockPort=7082

deployMode=client

$SPARK_HOME/bin/spark-submit 
  --conf "spark.driver.port=${driverPort}"
  --conf "spark.blockManager.port=${blockPort}"
  --class $mainClass 
  --master $sparkMaster 
  --deploy-mode $deployMode 
  --jars /path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar 
  $jar

When I do that, everything work well. But now, I want to run my application in cluster mode.

So I modify a bit my submit script by setting sparkMaster with my master IP, and deployMode to ‘cluster’.

When I submit my application, I get almost instantly the following error in my driver logs :

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        ...

Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        ...

Note :

  • I still have the error with a cluster of only one Worker on the same machine as my Master .
  • At first, I was using Spark 2.3.1, and I had no trouble running my code in Cluster mode (using spark-cassandra-connector_2.11-2.3.1.jar in --jars).
  • I tried multiples jars in --jars like : spark-cassandra-connector_2.11-2.0.0.jar, spark-cassandra-connector_2.11-2.0.2.jar, spark-cassandra-connector_2.11-2.3.1.jar, spark-cassandra-connector-java_2.11-1.5.1.jar, but none of them worked.
  • Some other jars are set in the --jars param and are taken into account

Answer

You may need to specify path as file:///path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar instead – in this case, it will be distributed to executors via driver’s HTTP server. Otherwise it expects that file is already copied by you to all machines to avoid copying by process itself. See Spark documentation for details

I would rather recommend to just create uberjar with all dependencies (except Spark’s), and submit it – it would be less pain with such things.



Source: stackoverflow