In Spark Streaming, how to detect an empty batch?

In Spark Streaming, how to detect an empty batch?

Take an example of counting the number of streaming words: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java . Is it possible to print the number of RDD words only when adding new words to the stream?

+5
source share
2 answers

Here is how I did it. Create an empty RDD, which is your previous window. Then in forEachRDD calculate the difference between the last window and the current window. if the current window does not contain entries in the previous window, there is something new in the package. Finally, set the previous window to what is in the current window.

... var previousWindowRdd = sc.emptyRDD[String] dStream.foreachRDD { windowRdd => { if (!windowRdd.isEmpty) processWindow(windowRdd.cache()) } } ... def processWindow(windowRdd: RDD[String]) = { val newInBatch = windowRdd.subtract(previousWindowRdd) if (!newInBatch.isEmpty()) processNewBatch(windowRdd) previousWindowRdd = windowRdd } 
+2
source

So I avoid empty packages and rewrite them in one directory.

 import java.time.format.DateTimeFormatter import java.time.LocalDateTime messageRecBased.foreachRDD{ rdd => rdd.repartition(1) val eachRdd = rdd.map(record => record.value) if(!eachRdd.isEmpty) eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/") } 
0
source

Source: https://habr.com/ru/post/1215633/


All Articles