Spark SQL: Why two jobs for one query?

Experiment

I tried the following snippet on Spark 1.6.1 .

 val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files soDF.registerTempTable("so") sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/") 

Physical Plan :

 == Physical Plan == Sort [cnt#59L ASC], true, 0 +- ConvertToUnsafe +- Exchange rangepartitioning(cnt#59L ASC,200), None +- ConvertToSafe +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L]) +- TungstenExchange hashpartitioning(dpHour#38,200), None +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L]) +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder 

For this request, I received two jobs: Job 9 and Job 10 enter image description here

For Job 9 DAG :

enter image description here

For Job 10 DAG :

enter image description here

Observations

  • There are apparently two jobs for one request.
  • Stage-16 (labeled Stage-14 in Job 9 ) is skipped in Job 10 .
  • Stage-15 last RDD[48] , coincides with Stage-17 last RDD[49] . How? I saw in magazines that after Stage-15 execution, RDD[48] registered as RDD[49]
  • Stage-17 shown in driver-logs but has not been executed in Executors . driver-logs showed the driver-logs the task, but when I looked at the Yarn container logs, there was no evidence of getting any tasks from Stage-17 .

Logs supporting these observations ( driver-logs , I lost executor logs due to a later crash). It can be seen that before the start of Stage-17 , RDD[49] registered RDD[49] :

 16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200) 16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200) 16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s 16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s 16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26 16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26) 16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions 16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26) 16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17) 16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17) 16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents 16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB) 16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB) 16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB) 16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26) 16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks 16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes) 16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes) 16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes) 16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes) 16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes) 16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes) 

Questions

  • Why two jobs ? What is the intention here by breaking the DAG into two jobs ?
  • Job 10 DAG to complete the request looks complete . Is there anything specific Job 9 ?
  • Why is Stage-17 not missing? It seems like dummy tasks created, do they have any goals.
  • Later I tried another simpler query. Suddenly 3 jobs were created.

    sqlContext.sql ("select dpHour from this dphour order"). write.parquet ("/ out2 /")

+7
unsafe apache-spark apache-spark-sql parquet
source share
1 answer

When you use the high-level data / dataset APIs, you leave it to Spark to determine the execution plan, including the task / step. They depend on many factors, such as the execution of parallelism, cached / persistent data structures, etc. In future versions of Spark, as the complexity of the optimizer increases, you can see even more tasks for the query, for example, some data sources for parameterizing cost-based optimization.

For example, I often, but not always, saw how a record generates individual jobs from processing, which includes shuffling.

On the bottom line, if you use a high-level API, if you do not need to do extremely detailed optimization with huge amounts of data, it rarely pays to delve into a specific fragment. Startup costs are extremely low compared to processing / output.

If, on the other hand, you are interested in learning about the internal components of Spark, read the optimizer code and include it on the Spark developers mailing list.

+4
source share

All Articles