Here is how I did it. Create an empty RDD, which is your previous window. Then in forEachRDD calculate the difference between the last window and the current window. if the current window does not contain entries in the previous window, there is something new in the package. Finally, set the previous window to what is in the current window.
... var previousWindowRdd = sc.emptyRDD[String] dStream.foreachRDD { windowRdd => { if (!windowRdd.isEmpty) processWindow(windowRdd.cache()) } } ... def processWindow(windowRdd: RDD[String]) = { val newInBatch = windowRdd.subtract(previousWindowRdd) if (!newInBatch.isEmpty()) processNewBatch(windowRdd) previousWindowRdd = windowRdd }
source share