Does spark.sql.autoBroadcastJoinThreshold work with connections using the Dataset join operator?

I would like to know if the spark.sql.autoBroadcastJoinThreshold property spark.sql.autoBroadcastJoinThreshold be useful for translating a smaller table on all work nodes (when creating a connection), even when the connection scheme uses the Dataset API join instead of using Spark SQL.

If my large table is 250 Gigs and Smaller is 20 Gigs, do I need to set this configuration: spark.sql.autoBroadcastJoinThreshold = 21 Gigs (possibly) to send the entire table / Dataset all work nodes?

<strong> Examples:

  • API Dataset API

     val result = rawBigger.as("b").join( broadcast(smaller).as("s"), rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), "left_outer" ) 
  • SQL

     select * from rawBigger_table b, smaller_table s where b.campign_id = s.campaign_id; 
+5
source share
2 answers

First of all, spark.sql.autoBroadcastJoinThreshold and broadcast hint are separate mechanisms. Even if autoBroadcastJoinThreshold disabled, the broadcast setting will take precedence. With default settings:

 spark.conf.get("spark.sql.autoBroadcastJoinThreshold") 
 String = 10485760 
 val df1 = spark.range(100) val df2 = spark.range(100) 

Spark will use autoBroadcastJoinThreshold and automatically broadcast data:

 df1.join(df2, Seq("id")).explain 
 == Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=Some(8)) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=Some(8)) 

When we turn off automatic broadcasting, Spark will use the standard SortMergeJoin :

 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df1.join(df2, Seq("id")).explain 
 == Physical Plan == *Project [id#0L] +- *SortMergeJoin [id#0L], [id#3L], Inner :- *Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200) : +- *Range (0, 100, step=1, splits=Some(8)) +- *Sort [id#3L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200) 

but can forcibly use BroadcastHashJoin with the broadcast hint:

 df1.join(broadcast(df2), Seq("id")).explain 
 == Physical Plan == *Project [id#0L] +- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=Some(8)) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=Some(8)) 

SQL has its own prompt format (similar to the one used in Hive):

 df1.createOrReplaceTempView("df1") df2.createOrReplaceTempView("df2") spark.sql( "SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id" ).explain 
 == Physical Plan == *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight :- *Range (0, 100, step=1, splits=8) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *Range (0, 100, step=1, splits=8) 

So, to answer your question - autoBroadcastJoinThreshold applicable when working with the Dataset API, but it does not matter when using explicit broadcast hints.

In addition, broadcasting large objects is unlikely to provide an increase in productivity, and in practice often degrades performance and leads to a stability problem. Remember that the transferred object must first be extracted to the driver, then sent to each employee, and finally loaded into memory.

+10
source

Just to share more details (from code) to a great answer from @ user6910411 .


Quote source code (mine formatting):

spark.sql.autoBroadcastJoinThreshold sets the maximum size in bytes for the table that will be transmitted to all work nodes when the connection is made.

By setting this value to -1, you can turn off broadcasting.

Please note that currently statistics are only supported for Hive Metastore tables where the ANALYZE TABLE COMPUTE STATISTICS noscan command was run, and file-based data source tables where statistics are calculated directly in the data files.

spark.sql.autoBroadcastJoinThreshold is 10M by default (i.e. 10L * 1024 * 1024 ), and Spark will check which connection to use (see JoinSelection execution scheduling strategy).

There are various options for choosing connection 6 , including broadcasting (using the physical BroadcastHashJoinExec or BroadcastNestedLoopJoinExec physical operators).

BroadcastHashJoinExec will be selected when the keys and one of the following values ​​are attached:

  • A join is one of CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI and the right side of the connection that can be transmitted, i.e. size smaller than spark.sql.autoBroadcastJoinThreshold
  • A join is one of CROSS, INNER, and RIGHT OUTER, and the left side of the connection can be broadcast, i.e. size smaller than spark.sql.autoBroadcastJoinThreshold

BroadcastNestedLoopJoinExec will be selected when there are non- connecting keys and one of the following holds, and one of the above BroadcastHashJoinExec conditions is met.

In other words, Spark will automatically select the correct connection, including BroadcastHashJoinExec based on the spark.sql.autoBroadcastJoinThreshold property (among other requirements), but also the type of connection.

+5
source

All Articles