Beveled dataset joins Spark?

I am connecting two large datasets using Spark RDD. One data set is very distorted, so few of the tasks of the executor take a long time to complete the work. How can I solve this scenario?

+15
join apache-spark
source share
4 answers

Pretty good article on how to do this: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

Short version:

  • Add a random element to a large RDD and create a new join key with it
  • Add a random item to a small RDD using explode / flatMap to increase the number of records and create a new join key.
  • Attach RDD to a new attachment key, which will now be better distributed due to random seeding
+17
source share

Depending on the specific type of skew you are experiencing, there may be different ways to solve it. Main idea:

  • Modify your join column or create a new join column that is not skewed but which still retains adequate join information
  • Connect in this disjoint column - the resulting sections will not be skewed
  • After merging, you can update the merge column to the desired format or discard it if you created a new column

The article "Fighting Skew in Sparks" mentioned in LiMuBei's answer is a good technique if distorted data is involved in the merge. In my case, the skew was caused by a very large number of null values ​​in the join column. Zero values ​​did not participate in the join, but since the Spark sections in the join column, the sections after the join were very distorted, because there was one giant section containing all the zeros.

I solved this by adding a new column that changed all the null values ​​to a well-distributed temporary value such as "NULL_VALUE_X", where X is replaced with random numbers from 1 to 10000, for example. (in Java):

// Before the join, create a join column with well-distributed temporary values for null swids. This column // will be dropped after the join. We need to do this so the post-join partitions will be well-distributed, // and not have a giant partition with all null swids. String swidWithDistributedNulls = "swid_with_distributed_nulls"; int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions Column swidWithDistributedNullsCol = when(csDataset.col(CS_COL_SWID).isNull(), functions.concat( functions.lit("NULL_SWID_"), functions.round(functions.rand().multiply(numNullValues))) ) .otherwise(csDataset.col(CS_COL_SWID)); csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol); 

Then joins this new column, and then after joining:

 outputDataset.drop(swidWithDistributedNullsCol); 
+8
source share

Let's say you have to join two tables A and B at A.id = B.id. Suppose table A is skewed by id = 1.

those. select A.id from A join B to A.id = B.id

There are two main approaches to solving the bias problem:

Approach 1:

Break the query / data set into two parts - one containing only skew, and the other containing not skewed data. In the above example. the request will become -

  1. select A.id from A join B on A.id = B.id where A.id <> 1; 2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1; 

The first request will not have any distortion, so all ResultStage tasks will be completed at about the same time.

If we assume that B has only a few lines with B.id = 1, then it will fit into memory. Thus, the second request will be converted to a broadcast connection. This is also called joining a map in Hive.

Link: https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization

The partial results of the two queries can then be combined to produce the final results.

Approach 2:

Also mentioned by LeMuBei above, the second approach attempts to randomize the join key by adding an extra column. Steps:

  1. Add a column to the large table (A), say skewLeft and fill it with random numbers from 0 to N-1 for all rows.

  2. Add a column to the smaller table (B), say skewRight. Repeat the smaller table N times. Thus, the values ​​in the new skewRight column will differ from 0 to N-1 for each copy of the source data. You can use the explode sql / dataset operator for this.

After 1 and 2, join 2 datasets / tables with the updated join condition to-

  *A.id = B.id && A.skewLeft = B.skewRight* 

Link: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

+5
source share

You can try to remake the "skewed" RDD into more partitions or try to increase spark.sql.shuffle.partitions (200 by default).

In your case, I will try to set the number of sections much higher than the number of performers.

-one
source share

All Articles