I want to implement checkpoint with spark file streaming application to process all unprocessed files from hadoop if in any case my spark streaming application stop/terminates. I am following this : streaming programming guide, but not found JavaStreamingContextFactory. Please help me what should I do.
My Code is
public class StartAppWithCheckPoint { public static void main(String[] args) { try { String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/"; String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint"; SparkSession sparkSession = JavaSparkSessionSingleton.getInstance(); JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300)); JavaDStream<String> lines = jssc.textFileStream(filePath).cache(); jssc.checkpoint(checkpointDirectory); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); context.start(); context.awaitTermination(); context.close(); sparkSession.close(); } catch(Exception e) { e.printStackTrace(); } } }
Advertisement
Answer
You must use Checkpointing
For checkpointing use stateful transformations either updateStateByKey
or reduceByKeyAndWindow
. There are a plenty of examples in spark-examples provided along with prebuild spark and spark source in git-hub. For your specific, see JavaStatefulNetworkWordCount.java;