Does cache () spark correct RDD state or create a new one?

This question is a continuation of the previous question that I had. What happens if I hide the same RDD in Spark twice .

When cache() called in the RDD, the RDD state changed (and for RDD, just this ), or is a new RDD created wrapped in an existing one?

What will happen in the following code:

 // Init JavaRDD<String> a = ... // some initialise and calculation functions. JavaRDD<String> b = a.cache(); JavaRDD<String> c = b.cache(); // Case 1, will 'a' be calculated twice in this case // because it before the cache layer: a.saveAsTextFile(somePath); a.saveAsTextFile(somePath); // Case 2, will the data of the calculation of 'a' // be cached in the memory twice in this case // (once as 'b' and once as 'c'): c.saveAsTextFile(somePath); 
+3
source share
2 answers

When the cache () is called on the RDD, the RDD state changes (and the returned RDD is just for ease of use) or a new RDD is created, wrapped the existing one

The same RDD returned :

 /** * Mark this RDD for persisting using the specified level. * * @param newLevel the target storage level * @param allowOverride whether to override any existing level with the new one */ private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this } 

Caching does not cause any side effect for the mentioned RDD. If it is already indicated for perseverance, nothing will happen. If this is not the case, the only side effect will be to register it on SparkContext , where the side effect does not apply to the RDD itself, but to the context.

Edit:

Looking at JavaRDD.cache , it seems that the main call will cause the allocation of another JavaRDD :

 /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) 

Where wrapRDD calls JavaRDD.fromRDD :

 object JavaRDD { implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd) implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd } 

This will cause the allocation of a new JavaRDD . However, the internal instance of RDD[T] will remain the same.

+6
source

Caching does not change RDD state.

When a conversion occurs, caching calculates and materializes the RDD in memory while tracking its origin (dependencies). There are many levels of perseverance.

Since caching remembers the line of RDDs, Spark can reassemble the loss sections in case of node failures. Finally, the cached RDD lives in the context of the running application, and as soon as the application terminates, the cached RDDs are also deleted.

+1
source

All Articles