I have some troubles running a Spark Application that reads data from Cassandra in Spark 2.0.0.
My code work as follow :
DataFrameReader readerCassandra = SparkContextUtil.getInstance().read() .format("org.apache.spark.sql.cassandra") .option("spark.cassandra.connection.host", [DATABASE_IP]) .option("spark.cassandra.connection.port", [DATABASE_PORT]); final Map<String,String> map = new HashMap<String,String>(); map.put("table", "MyTable"); map.put("keyspace", "MyKeyspace"); public final StructType schema = DataTypes.createStructType( new StructField[] { DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("timestamp", DataTypes.TimestampType, true), DataTypes.createStructField("value", DataTypes.DoubleType, true) }); final Dataset<Row> dataset = readerCassandra.schema(schema).options(map).load(); dataset.show(false);
I want to run this code in a cluster. My cluster use spark-2.0.2-bin-hadoop2.7 (No spark-2.0.0 available at http://spark.apache.org/downloads.html).
At first, I submit it in Client mode with the following script :
#!/bin/bash sparkMaster=local[*] mainClass=package.MainClass jar=/path/to/myJar-with-dependencies.jar driverPort=7079 blockPort=7082 deployMode=client $SPARK_HOME/bin/spark-submit --conf "spark.driver.port=${driverPort}" --conf "spark.blockManager.port=${blockPort}" --class $mainClass --master $sparkMaster --deploy-mode $deployMode --jars /path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar $jar
When I do that, everything work well. But now, I want to run my application in cluster mode.
So I modify a bit my submit script by setting sparkMaster
with my master IP, and deployMode
to ‘cluster’.
When I submit my application, I get almost instantly the following error in my driver logs :
Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) ... Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ...
Note :
- I still have the error with a cluster of only one Worker on the same machine as my Master .
- At first, I was using Spark 2.3.1, and I had no trouble running my code in Cluster mode (using spark-cassandra-connector_2.11-2.3.1.jar in
--jars
). - I tried multiples jars in
--jars
like :spark-cassandra-connector_2.11-2.0.0.jar
,spark-cassandra-connector_2.11-2.0.2.jar
,spark-cassandra-connector_2.11-2.3.1.jar
,spark-cassandra-connector-java_2.11-1.5.1.jar
, but none of them worked. - Some other jars are set in the
--jars
param and are taken into account
Advertisement
Answer
You may need to specify path as file:///path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar
instead – in this case, it will be distributed to executors via driver’s HTTP server. Otherwise it expects that file is already copied by you to all machines to avoid copying by process itself. See Spark documentation for details…
I would rather recommend to just create uberjar with all dependencies (except Spark’s), and submit it – it would be less pain with such things.