Spark - Random Number Generation

I wrote a method that should take into account a random number to simulate the Bernoulli distribution. I use random.nextDouble to generate a number from 0 to 1, and then make my decision based on this value, given my probability parameter.

My problem is that Spark generates the same random numbers in every iteration of my loop mapping function. I am using the DataFrame API. My code follows this format:

 val myClass = new MyClass() val M = 3 val myAppSeed = 91234 val rand = new scala.util.Random(myAppSeed) for (m <- 1 to M) { val newDF = sqlContext.createDataFrame(myDF .map{row => RowFactory .create(row.getString(0), myClass.myMethod(row.getString(2), rand.nextDouble()) }, myDF.schema) } 

Here is the class:

 class myClass extends Serializable { val q = qProb def myMethod(s: String, rand: Double) = { if (rand <= q) // do something else // do something else } } 

I need a new random number every time myMethod is myMethod . I also tried to create a number inside my method using java.util.Random ( scala.util.Random v10 does not extend Serializable ) as shown below, but I still get the same numbers in every loop

 val r = new java.util.Random(s.hashCode.toLong) val rand = r.nextDouble() 

I did some research, and it seems to be related to the deterministic nature of Sparks.

+10
scala random apache-spark spark-dataframe
source share
4 answers

The reason for repeating the same sequence is because a random generator is created and initialized by the seed before the data is split. Then each section begins with the same random seed. This may not be the most efficient way to do this, but the following should work:

 val myClass = new MyClass() val M = 3 for (m <- 1 to M) { val newDF = sqlContext.createDataFrame(myDF .map{ val rand = scala.util.Random row => RowFactory .create(row.getString(0), myClass.myMethod(row.getString(2), rand.nextDouble()) }, myDF.schema) } 
+3
source share

Just use the SQL rand function:

 import org.apache.spark.sql.functions._ //df: org.apache.spark.sql.DataFrame = [key: int] df.select($"key", rand() as "rand").show +---+-------------------+ |key| rand| +---+-------------------+ | 1| 0.8635073400704648| | 2| 0.6870153659986652| | 3|0.18998048357873532| +---+-------------------+ df.select($"key", rand() as "rand").show +---+------------------+ |key| rand| +---+------------------+ | 1|0.3422484248879837| | 2|0.2301384925817671| | 3|0.6959421970071372| +---+------------------+ 
+8
source share

According to this post , the best solution is not to place new scala.util.Random inside the map, and also completely outside (i.e. in the driver code), but in an intermediate mapPartitionsWithIndex :

 import scala.util.Random val myAppSeed = 91234 val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => val rand = new scala.util.Random(indx+myAppSeed) iter.map(x => (x, Array.fill(10)(rand.nextDouble))) } 
+3
source share

Using the Spark Dataset API, possibly for use in a battery:

 df.withColumn("_n", substring(rand(),3,4).cast("bigint")) 
0
source share

All Articles