So first, I want to say the only thing I have seen address this issue is here: Spark 1.6.1 SASL. However, when adding the configuration for the spark and yarn authentication, it is still not working. Below is my configuration for spark using spark-submit on a yarn cluster on amazon’s emr:
SparkConf sparkConf = new SparkConf().setAppName("secure-test"); sparkConf.set("spark.authenticate.enableSaslEncryption", "true"); sparkConf.set("spark.network.sasl.serverAlwaysEncrypt", "true"); sparkConf.set("spark.authenticate", "true"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator"); try { sparkConf.registerKryoClasses(new Class<?>[]{ Class.forName("org.apache.hadoop.io.LongWritable"), Class.forName("org.apache.hadoop.io.Text") }); } catch (Exception e) {} sparkContext = new JavaSparkContext(sparkConf); sparkContext.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); sparkContext.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true"); sparkContext.hadoopConfiguration().set("spark.authenticate", "true");
Note, I added the spark.authenticate to the sparkContext’s hadoop configuration in code instead of the core-site.xml (which I am assuming I can do that since other things work as well).
Looking here: https://github.com/apache/spark/blob/master/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java it seems like both spark.authenticate’s are necessary. When I run this application, I get the following stack trace.
17/01/03 22:10:23 INFO storage.BlockManager: Registering executor with local external shuffle service. 17/01/03 22:10:23 ERROR client.TransportClientFactory: Exception while bootstrapping client after 178 ms java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown message type: -22 at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:71) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745)
In Spark’s docs, it says
For Spark on YARN deployments, configuring spark.authenticate to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret.
which seems wrong based on the comments in the yarn file above, but with trouble shooting, I am still lost on where I should go to get sasl to work? Am I missing something obvious that is documented somewhere?
Advertisement
Answer
So I finally figured it out. The previous StackOverflow thread was technically correct. I needed to add the spark.authenticate to the yarn configuration. Maybe it is possible to do this, but I can’t figure out how to add this configuration in the code, which makes sense at a high level why this is the case. I will post my configuration below in case anyone else runs into this issue in the future.
First, I used an aws emr configurations file (An example of this is when using aws cli aws emr create-cluster --configurations file://youpathhere.json
)
Then, I added the following json to the file:
[{ "Classification": "spark-defaults", "Properties": { "spark.authenticate": "true", "spark.authenticate.enableSaslEncryption": "true", "spark.network.sasl.serverAlwaysEncrypt": "true" } }, { "Classification": "core-site", "Properties": { "spark.authenticate": "true" } }]