I have an RDD type RDD[(k:Int,v:String)]. I want to take up to 1000 tuples for each key k, so I have [(k,v)]where the key does not appear more than 1000 times. Is there a way to do this when I can avoid a performance penalty when calling groupBy first ? I cannot find a good way to sum the values in such a way as to avoid the full groupBy, which causes my work to fail.
Naive approach:
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}
I am looking for a more efficient approach that avoids groupBy:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
}
Here is the best solution that I have developed so far, but it does not check the type.
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
}
Edit I came up with a slightly better solution that seems to work:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}