Why does a conversion do side effects (println) only once in Structured Streaming?

Why does the select statement print every package, but hello world only once?

 import org.apache.spark.sql.types._ val schema = StructType( StructField("id", LongType, nullable = false) :: StructField("name", StringType, nullable = false) :: StructField("score", DoubleType, nullable = false) :: Nil) val in: DataFrame = sparkSession.readStream .schema(schema) .format("csv") .option("header", false) .option("maxFilesPerTrigger", 1) .option("delimiter", ";") .load("s3://xxxxxxxx") val input: DataFrame = in.select("*") .transform { ds => println("hello world") // <-- Why is this printed out once? ds } import org.apache.spark.sql.streaming.StreamingQuery val query: StreamingQuery = input.writeStream .format("console") .start 
+5
source share
1 answer

The spark 2.1.0-SNAPSHOT is here (built today), but I believe that it has not changed between 2.0 and now.

 $ ./bin/spark-submit --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT /_/ Branch master Compiled by user jacek on 2016-09-30T07:08:39Z Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a Url https://github.com/apache/spark.git Type --help for more information. 

In Spark Structured Streaming, your streaming application is just a trick for applying the same physical query plan to your input sources.

Please note that the physical query plan is what your Dataset does (and the more I use Spark SQL, the more I don’t see the difference between queries and datasets - they are just interchangeable these days).

When you describe a structured request (regardless of whether it is a one-time or stream request), it goes through 4 stages of analysis, analysis, optimization and, finally, creating a physical plan. You can view it using the explain(extended = true) method.

 scala> input.explain(extended = true) == Parsed Logical Plan == StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Analyzed Logical Plan == id: bigint, name: string, score: double StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Optimized Logical Plan == StreamingRelation DataSource( org.apache.spark.sql.SparkSession@5c4f07c1 ,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17] == Physical Plan == StreamingRelation FileSource[input-json], [id#15L, name#16, score#17] 

Stages are lazy and run only once .

Once you have a physical plan, the steps will not be performed again. Your Dataset pipeline has already been calculated, and the only lost part is the data that will pass through the pipe.

This is why you see "peace hello" only once - when the streaming request plan was "completed" to create a physical plan. It was run once and optimized to handle the Dataset source (and only Dataset , so any side effects have already been triggered).

Interesting case. This is a lot to bring him here!

+6
source

All Articles