How to sort RDD in Scala Spark?

Reading the Spark method sortByKey:

sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. 

Is it possible to return only the number of results "N". Therefore, instead of returning all the results, simply return the top ten. I could convert the sorted collection to Array and use the take method, but since this is an O (N) operation, is there a more efficient method?

+32
scala apache-spark rdd
May 23 '14 at
source share
3 answers

Most likely, you have already studied the source code:

  class OrderedRDDFunctions { // <snip> def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = { val part = new RangePartitioner(numPartitions, self, ascending) val shuffled = new ShuffledRDD[K, V, P](self, part) shuffled.mapPartitions(iter => { val buf = iter.toArray if (ascending) { buf.sortWith((x, y) => x._1 < y._1).iterator } else { buf.sortWith((x, y) => x._1 > y._1).iterator } }, preservesPartitioning = true) } 

And, as you say, these integers must go through the shuffling stage - as can be seen from the fragment.

However, your concern about the subsequent call to take (K) may not be as accurate. This operation does NOT cycle through all N elements:

  /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. */ def take(num: Int): Array[T] = { 

So it would seem:

O (myRdd.take (K)) <O (myRdd.sortByKey ()) ~ = O (myRdd.sortByKey.take (k)) (at least for small K) O (myRdd.sortByKey (). Collect ( )

+18
May 24 '14 at 7:23
source share

If you only need the top 10, use rdd.top(10) . It avoids sorting, so it is faster.

rdd.top makes one parallel pass through the data, collecting the top N in each section on the heap, and then concatenating the heaps. This is an O operation (rdd.count). Sorting will be O (rdd.count log rdd.count) and will carry a large data transfer - it moves, so all data will be transmitted over the network.

+49
Jun 14 '14 at 0:20
source share

Another option, at least from PySpark 1.2.0, is to use takeOrdered .

In ascending order:

 rdd.takeOrdered(10) 

In decreasing order:

 rdd.takeOrdered(10, lambda x: -x) 

Upper k values โ€‹โ€‹for k, v pairs:

 rdd.takeOrdered(10, lambda (k, v): -v) 
+7
Jun 19 '15 at 17:12
source share



All Articles