Using PartitionBy to separate and efficiently calculate RDD groups with a key

I executed the solution for the RDD[K, V] group by key and calculated the data according to each group (K, RDD[V]) using partitionBy and Partitioner . However, I am not sure if it is really effective, and I would like to have your point of view.

Here is an example: according to the list [K: Int, V: Int] , calculate the value of V for each group K , knowing that it must be distributed and that the values โ€‹โ€‹of V can be very large. This should give:

 List[K, V] => (K, mean(V)) 

Simple Partitioner Class:

 class MyPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

Section Code:

 val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) val rdd = sc.parallelize(l) val p = rdd.partitionBy(new MyPartitioner(4)).cache() p.foreachPartition(x => { try { val r = sc.parallelize(x.toList) val id = r.first() //get the K partition id val v = r.map(x => x._2) println(id._1 + "->" + mean(v)) } catch { case e: UnsupportedOperationException => 0 } }) 

Output:

1->13, 2->4, 3->7

My questions:

  • what does this really happen when partitionBy called? (sorry, I did not find enough specifications on it).
  • Is it really useful to display by sections, knowing that in my production case it will not be too many keys (like 50 for a sample) to very large values โ€‹โ€‹(like 1 million for a sample).
  • What is the cost of paralellize(x.toList) ? Is this consistent with this? (I need RDD on mean() input)
  • How do you do it yourself?

Hi

+7
apache-spark rdd
source share
1 answer

Your code should not work. You cannot pass a SparkContext object SparkContext executors. (This is not Serializable .) Also I do not understand why you need it.

To calculate the average, you need to calculate the sum and the counter and take their ratio. By default, the delimiter will work fine.

 def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { case class SumCount(sum: Double, count: Double) val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))( (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) sumCounts.map(sc => sc.sum / sc.count) } 

This is an efficient one-pass calculation that generalizes well.

+4
source share

All Articles