To interact with our examples, we will use the netcat (nc) tool. In particular, we will open a connection on localhost post 9999, where we will send sentencies to be process by our Spark Streaming application. For each example, we can open a connection using nc as follows: $> nc -l -p 9999 (alternatvely, we can use the bash script "1_openSocket.sh") Then, we can launch the Spark application using our IDE or by explicitly sending the jar to Spark with spark-submit. ----------------------- Network Word Count ----------------------- We can reuse the wordcount already developed in Spark in a Streaming fashion by changing the execution context of Spark, and relying on the StreamingContext. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); Where Duration specifies the micro-batch size. #Slide dal master Example: SparkConf sparkConf = new SparkConf() // Spark Streaming needs at least two working thread (local[2]) .setMaster("local[2]") .setAppName("JavaNetworkWordCount"); // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_ONLY); JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ---------------------- Window Operations ---------------------- Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window. # Figure Example (WindowBasedWordCount): We add a sliding window on our word count: JavaReceiverInputDStream lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_ONLY); /* Process incoming data using a sliding window (window length = 30, sliding interval = 2) */ JavaDStream linesInWindow = lines.window(Durations.seconds(30*WINDOW_TIME_UNIT_SECS), Durations.seconds(2 * WINDOW_TIME_UNIT_SECS)); JavaDStream words = linesInWindow.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((i1, i2) -> i1 + i2); ---------------------- Window Operations (Second Example) ---------------------- Example (A more efficient version - WindowBasedWordCount2): A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and "inverse reducing" the old data that leaves the window. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) Note that checkpointing must be enabled for using this operation. ssc.checkpoint(LOCAL_CHECKPOINT_DIR); JavaPairDStream wordCountPairs = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_ONLY) .flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()) .mapToPair(s -> new Tuple2<>(s, 1)); JavaPairDStream wordCounts = wordCountPairs.reduceByKeyAndWindow( (i1, i2) -> i1 + i2, (i1, i2) -> i1 - i2, Durations.seconds(30 * WINDOW_TIME_UNIT_SECS), Durations.seconds(2 * WINDOW_TIME_UNIT_SECS)); // Operations wordCounts.print(); wordCounts.foreachRDD(new SaveAsLocalFile()); Read more: https://spark.apache.org/docs/latest/streaming-programming-guide.html