Let's say you have to join two tables A and B at A.id = B.id. Suppose table A is skewed by id = 1.
those. select A.id from A join B to A.id = B.id
There are two main approaches to solving the bias problem:
Approach 1:
Break the query / data set into two parts - one containing only skew, and the other containing not skewed data. In the above example. the request will become -
1. select A.id from A join B on A.id = B.id where A.id <> 1; 2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
The first request will not have any distortion, so all ResultStage tasks will be completed at about the same time.
If we assume that B has only a few lines with B.id = 1, then it will fit into memory. Thus, the second request will be converted to a broadcast connection. This is also called joining a map in Hive.
Link: https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization
The partial results of the two queries can then be combined to produce the final results.
Approach 2:
Also mentioned by LeMuBei above, the second approach attempts to randomize the join key by adding an extra column. Steps:
Add a column to the large table (A), say skewLeft and fill it with random numbers from 0 to N-1 for all rows.
Add a column to the smaller table (B), say skewRight. Repeat the smaller table N times. Thus, the values ββin the new skewRight column will differ from 0 to N-1 for each copy of the source data. You can use the explode sql / dataset operator for this.
After 1 and 2, join 2 datasets / tables with the updated join condition to-
*A.id = B.id && A.skewLeft = B.skewRight*
Link: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
prakharjain
source share