The spark 2.1.0-SNAPSHOT is here (built today), but I believe that it has not changed between 2.0 and now.
$ ./bin/spark-submit --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT /_/ Branch master Compiled by user jacek on 2016-09-30T07:08:39Z Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a Url https://github.com/apache/spark.git Type --help for more information.
In Spark Structured Streaming, your streaming application is just a trick for applying the same physical query plan to your input sources.
Please note that the physical query plan is what your Dataset does (and the more I use Spark SQL, the more I donβt see the difference between queries and datasets - they are just interchangeable these days).
When you describe a structured request (regardless of whether it is a one-time or stream request), it goes through 4 stages of analysis, analysis, optimization and, finally, creating a physical plan. You can view it using the explain(extended = true) method.
scala> input.explain(extended = true) == Parsed Logical Plan == StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Analyzed Logical Plan == id: bigint, name: string, score: double StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Optimized Logical Plan == StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Physical Plan == StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
Stages are lazy and run only once .
Once you have a physical plan, the steps will not be performed again. Your Dataset pipeline has already been calculated, and the only lost part is the data that will pass through the pipe.
This is why you see "peace hello" only once - when the streaming request plan was "completed" to create a physical plan. It was run once and optimized to handle the Dataset source (and only Dataset , so any side effects have already been triggered).
Interesting case. This is a lot to bring him here!