Apache Spark Method returns RDD (with tail recursion)

RDD has a line and, therefore, does not exist until an action is performed, if it is performed; so if I have a method that performs numerous conversions to RDD and returns the converted RDD, then what am I really returning? Am I not returning anything until RDD is needed for the action? If I cached RDD in a method, is it cached? I think I know the answer to this: will the method only run when the action is called on the RDD, which is returned? But I could be wrong.

Extending this question: If I have a tail recursive method that takes RDD as a parameter and returns RDD, but I cache RDD inside the method:

def method(myRDD : RDD) : RDD = {
   ...
   anRDD.cache
   if(true) return someRDD
   method(someRDD) // tailrec
}

Then, when tail recursion occurs, does the previous cached RDD overwrite, anRDDor does it both do? I would suggest both stubbornness. I have data spilled to disk when the data set that I use is only 63 MB. And I think this could have something to do with the tail recursive method.

+4
source share
1 answer

The RDD line is constructed as a graph of instances of RDD objects connected to each other, where each node in the line has a link to its dependencies. It has the simplest chain form, you can see it as a linked list:

hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)

You can evaluate this in the basic RDD constructor:

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent))) 

: , RDD. , RDD, RDD.

, , , "" .

(, ):

def isPrime(n:Int):Boolean = {
    (n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}

def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] = 
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))

RDD ints :

val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String = 
(8) FilteredRDD[54] at filter at <console>:18 []
 |  FilteredRDD[53] at filter at <console>:18 []
 |  MappedRDD[52] at map at <console>:18 []
 |  FilteredRDD[51] at filter at <console>:18 []
 |  MappedRDD[50] at map at <console>:18 []
 |  FilteredRDD[49] at filter at <console>:18 []
 |  MappedRDD[48] at map at <console>:18 []
 |  MappedRDD[47] at map at <console>:18 []
 |  MappedRDD[46] at map at <console>:18 []
 |  FilteredRDD[45] at filter at <console>:18 []
 |  MappedRDD[44] at map at <console>:18 []
 |  FilteredRDD[43] at filter at <console>:18 []
 |  MappedRDD[42] at map at <console>:18 []
 |  MappedRDD[41] at map at <console>:18 []
 |  ParallelCollectionRDD[33] at parallelize at <console>:13 []

"" , RDD "" , RDD, , . RDD , node .

, RDD , "" .

+4

All Articles