Let me simplify your question a bit. (Actually a lot.) We have RDD[(Int, String)] , and we want to find the 10 most common String for each Int (all of which are in the range 0 - 100).
Instead of sorting, as in your example, it is more efficient to use the built-in RDD.top(n) Spark method. Its runtime is linear in data size and requires much less data movement than sorting.
Consider the top implementation in RDD.scala . You want to do the same, but with one priority queue (heap) on Int . The code is getting pretty complicated:
import org.apache.spark.util.BoundedPriorityQueue // Pretend it not private. def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = { // A heap that only keeps the top N values, so it has bounded size. type Heap = BoundedPriorityQueue[(Long, String)] // Get the word counts. val counts: RDD[[(Int, String), Long)] = rdd.map(_ -> 1L).reduceByKey(_ + _) // In each partition create a column -> heap map. val perPartition: RDD[Map[Int, Heap]] = counts.mapPartitions { items => val heaps = collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n)) for (((k, v), count) <- items) { heaps(k) += count -> v } Iterator.single(heaps) } // Merge the per-partition heap maps into one. val merged: Map[Int, Heap] = perPartition.reduce { (heaps1, heaps2) => val heaps = collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n)) for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) { for (cv <- heap) { heaps(k) += cv } } heaps } // Discard counts, return just the top strings. merged.mapValues(_.map { case(count, value) => value }) }
This is effective, but painful, because we need to work with multiple columns at the same time. It would be easier to have one RDD per column and just call rdd.top(10) for each.
Unfortunately, the naive way to split an RDD into N smaller RDDs is to make N passes:
def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = { together.cache // We will make N passes over this RDD. (0 until columns).map { i => together.filter { case (key, value) => key == i }.values } }
A more efficient solution would be to write data to separate files by key, and then load them back into separate RDDs. This is discussed in Writing to Multiple Outputs Using the Spark Key - One Spark Job .