Shuffle a column randomly in a Spark RDD or dataframe

Anyway, can I shuffle the RDD or dataframe column so that the records in that column are displayed in random order? I'm not sure which APIs I could use to accomplish such a task.

+4
source share
4 answers

While you can’t just shuffle one column directly - you can rewrite entries in RDDthrough RandomRDDs. https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/random/RandomRDDs.html

A potential approach to moving only one column might be:

  • mapPartitions, / .
  • . .. iterator.toList. , (/small) , OOME
  • Row, ,
  • mapPartitions
  • list.toIterator mapPartitions
+2

, . , .

, , OOM. Spark , .

, .

+1

, orderBy(rand) ?

import org.apache.spark.sql.functions.rand

def addIndex(df: DataFrame) = spark.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

case class Entry(name: String, salary: Double)

val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)

val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
  .select(col("salary").as("salary_shuffled"))
  .orderBy(rand))

df.join(df_shuffled, Seq("_index"))
  .drop("_index")
  .show(false) 

+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max  |2001.21|3001.5         |
|Zhang|3111.32|3111.32        |
|Paul |3001.5 |2001.21        |
|Bob  |1919.21|1919.21        |
+-----+-------+---------------+
+1

, mapPartitions.

rdd.mapPartitions(Random.shuffle(_));

For PairRDD(RDD type RDD[(K, V)]), if you are interested in shuffling key value mappings (matching an arbitrary key with an arbitrary value):

pairRDD.mapPartitions(iterator => {
  val (keySequence, valueSequence) = iterator.toSeq.unzip
  val shuffledValueSequence = Random.shuffle(valueSequence)
  keySequence.zip(shuffledValueSequence).toIterator
}, true)

The Boolean flag at the end means that the partitioning is saved (keys are not changed) for this operation, so that subsequent operations, for example, reduceByKeycan be optimized (avoid shuffling).

0
source

All Articles