Spark RDD - Avoid Shuffling - Does Partitioning Help Handle Huge Files?

I have an application with about 10 flat files, each of which costs more than 200 MMM + entries in them. Business logic involves combining all of them in sequence.

my environment: 1 master - 3 slaves (for testing, I assigned 1 GB of memory for each node)

Most of the code just does the following for each connection.

RDD1 = sc.textFile(file1).mapToPair(..) RDD2 = sc.textFile(file2).mapToPair(..) join = RDD1.join(RDD2).map(peopleObject) 

Any suggestion for tuning, such as redistribution, parallelize ..? if so, are there any best practices when coming up with a good amount for redistribution?

with the current configuration, the task takes more than an hour, and I see that the record in random order for almost every file> 3 GB

+5
source share
2 answers

In practice with large datasets (5, 100G + each), I saw that a connection works best when you share all the RDDs involved in a series of connections before starting to join them.

 RDD1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(2048)) RDD2 = sc.textFile(file2).mapToPair(..).partitionBy(new HashPartitioner(2048)) . . . RDDN = sc.textFile(fileN).mapToPair(..).partitionBy(new HashPartitioner(2048)) //start joins 

RDD1.join(RDD2)...join(RDDN)


Note: I consider this type of connection as a tree connection (each RDD is used once). The rationale is presented in the following beautiful figure taken from Spark-UI:

enter image description here

+3
source

If we always join one RDD (say rdd1) with all the others, the idea is to split that RDD and save it later.

Here is an implementation of sudo-Scala (can be easily converted to Python or Java):

 val rdd1 = sc.textFile(file1).mapToPair(..).partitionBy(new HashPartitioner(200)).cache() 

Prior to this, we have rdd1 for hashing in 200 partitions. The first time it is evaluated, it will be saved (cached).

Now let's read two more rdds and join them.

 val rdd2 = sc.textFile(file2).mapToPair(..) val join1 = rdd1.join(rdd2).map(peopleObject) val rdd3 = sc.textFile(file3).mapToPair(..) val join2 = rdd1.join(rdd3).map(peopleObject) 

Note that for repair RDDs, we do not break them or cache them.

Spark will see that rdd1 already hashed a partition and it will use the same partitions for all other joins. Thus, rdd2 and rdd3 will mix their keys to the same places where the rdd1 keys are located.

To make this clearer, let's say that we are not dealing with the section, and we use the same code that is indicated in the question; Each time we make a connection, both rdds will be shuffled. This means that if we attach N to rdd1, the partitionless version will shuffle rdd1 N times. The split approach will shuffle rdd1 only once.

+2
source

All Articles