Effective takeByKey implementation for spark

I have an RDD type RDD[(k:Int,v:String)]. I want to take up to 1000 tuples for each key k, so I have [(k,v)]where the key does not appear more than 1000 times. Is there a way to do this when I can avoid a performance penalty when calling groupBy first ? I cannot find a good way to sum the values ​​in such a way as to avoid the full groupBy, which causes my work to fail.

Naive approach:

def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
    rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}

I am looking for a more efficient approach that avoids groupBy:

takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
    //use reduceByKey, foldByKey, etc..??
}

Here is the best solution that I have developed so far, but it does not check the type.

def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
      rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
}

Edit I came up with a slightly better solution that seems to work:

takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
    rdd.mapValues(List(_))
       .reduceByKey((x,y) => if(x.length >= n) x 
                             else if(y.length >= n) y 
                             else (x ++ y).take(n))
       .flatMap(t => t._2.map(v => (t._1, v)))
}
+4
2

, - :

  • mapValues(List(_)) List
  • length Seq List O (N)
  • x ++ y

, , List length. :

import scala.collection.mutable.ArrayBuffer

rdd.aggregateByKey(ArrayBuffer[Int]())(
  (acc, x) => if (acc.length >= n) acc else acc += x,
  (acc1, acc2) => {
    val (xs, ys) = if (acc1.length > acc2.length) (acc1, acc2) else (acc2, acc1)
    val toTake = Math.min(n - xs.length, ys.length)
    for (i <- 0 until toTake) {
      xs += ys(i)
    }
    xs         
  }
)

:

.flatMap(t => t._2.map(v => (t._1, v)))

.flatMapValues(x => x)  // identity[Seq[V]]

, .

+2

,

takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
    rdd.mapValues(List(_))
       .reduceByKey((x,y) => if(x.length >= n) x 
                             else if(y.length >= n) y 
                             else (x ++ y).take(n))
       .flatMap(t => t._2.map(v => (t._1, v)))
}

, groupByKey, .

+1

All Articles