I have spring boot java application myapp.jar with something udf function.
SparkConfuration.java
public SparkSession sparkSession() { SparkSession session = SparkSession .builder() .appName("uniloader-spark-master") .master(sparkMaster) .config("spark.jars", sparkJars) .getOrCreate(); log.info("Spark jars for control: " + sparkJars); session.sqlContext().udf().register("to_integer", new ToIntegerUdf(), DataTypes.IntegerType); return session; }
ToIntegerUdf.java
public class ToIntegerUdf implements UDF2<Object, Object, Integer> { private static final long serialVersionUID = 165436543635L; private static final Logger log = LoggerFactory.getLogger(BaseFilesUtils.class); @Override public Integer call(Object valueObj, Object delimiterObj) { try { log.info("value = " + valueObj); log.info("delimiter = " + delimiterObj); if (valueObj == null || delimiterObj == null) { return null; } String value = String.valueOf(valueObj); String delimiter = (String) delimiterObj; String doubleAsString = value.replaceAll(Pattern.quote(delimiter), "."); return new BigDecimal(doubleAsString).setScale(0, RoundingMode.HALF_UP).intValue(); } catch (Exception e) { log.error("Error while to_integer transformation for " + valueObj + ". Details:", e); return null; } } }
sparkJars contains path to myJar.jar.
Application build with Maven. Spark library version is 3.02 and scala version is 2.12.10.
When I running application on Spark Standalone 3.0.2 I have an error:
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2032) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1613) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
In spark worker log I see, worker fetch myJar:
21/03/23 19:33:24 INFO Executor: Fetching spark://demo.phoenixit.ru:39597/jars/myJar.jar with timestamp 1616517199949
I think problem with either dependency, or submitting, but I have no idea how to fix it.
Advertisement
Answer
I’ve fixed it.
The problem was with the spring-boot-maven-plugin. Spark standalone can’t get info about UDF’s if I use this plugin.
When I replaced plugin with maven-shade-plugin, everythinh worked correct