How to use Spark repartitionAndSortWithinPartitions?

I am trying to create a minimal working example repartitionAndSortWithinPartitions to understand a function. I still (not working, the kernel throws values ​​around so that they fail)

 def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = { iter.map( x => (partID, x)).toList.iterator } val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2) part20to2_sorted.mapPartitionsWithIndex(partval).collect 

but get an error

 Name: Compile Error Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int] val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2) 

I tried using scaladoc but could not find which class provides repartitionAndSortWithinPartitions . (Btw: this scaladoc is not impressive: why is MapPartitionsRDD missing? How can I find a method?)

Realizing that I need a separator object, the next time I tried

 val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos) val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner) part20to2_sorted.mapPartitionsWithIndex(partval).collect 

but got

 Name: Compile Error Message: <console>:22: error: type mismatch; found : org.apache.spark.rdd.RDD[Int] required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]] Error occurred in an application involving default arguments. val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos) 

How do I assemble it? Can I get a working example, please?

+6
source share
2 answers

Your problem is that part20to3_chaos is RDD[Int] , while OrderedRDDFunctions.repartitionAndSortWithinPartitions is a method that works with RDD[(K, V)] , where K is the key and V is the value.

repartitionAndSortWithinPartitions first redistribute the data based on the provided delimiter, and then sort by key:

 /** * Repartition the RDD according to the given partitioner and, * within each resulting partition, sort records by their keys. * * This is more efficient than calling `repartition` and then sorting within each partition * because it can push the sorting down into the shuffle machinery. */ def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) } 

So it looks like this is not exactly what you are looking for.

If you need a simple old type, you can use sortBy , since it does not require a key:

 scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33 scala> val sorted = toTwenty.sortBy(identity, true, 3).collect sorted: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 

Where do you pass the sortBy order (ascending or descending) and the number of partitions you want to create.

+7
source

Let me try to explain repartitionAndSortWithinPartitions via pyspark.

Suppose you have a pair of data set

 pairs = sc.parallelize([["a",1], ["b",2], ["c",3], ["d",3]]) pairs.collect() # Output [['a', 1], ['b', 2], ['c', 3], ['d', 3]] pairs.repartitionAndSortWithinPartitions(2).glom().collect() # Output [[('a', 1), ('c', 3)], [('b', 2), ('d', 3)]] 

Through repartitionAndSortWithinPartitions () we asked to shuffle the data in 2 sections and this is exactly what we get. 'a' and 'c' as one; 'b' and 'd' as the other. The keys are sorted.

We can also split-n-sort based on a specific condition, since

 pairs.repartitionAndSortWithinPartitions(2, partitionFunc=lambda x: x == 'a').glom().collect() # Output [[('b', 2), ('c', 3), ('d', 3)], [('a', 1)]] 

As expected, we have two sections with three sorted keys and one with ('a', 1). To learn more about glom, refer to this link.

0
source

All Articles