The recursive function will work:
/** * The return type is an Option to handle the case of a user specifying * a non positive number of steps. */ def createZippedNormal(sc : SparkContext, numPartitions : Int, numSteps : Int) : Option[RDD[Double]] = { @scala.annotation.tailrec def accum(sc : SparkContext, numPartitions : Int, numSteps : Int, currRDD : RDD[Double], seed : Long) : RDD[Double] = { if(numSteps <= 0) currRDD else { val newRDD = normalRDD(sc, numPartitions, seed) accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) } } if(numSteps <= 0) None else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) }
source share