When the cache () is called on the RDD, the RDD state changes (and the returned RDD is just for ease of use) or a new RDD is created, wrapped the existing one
The same RDD returned :
/** * Mark this RDD for persisting using the specified level. * * @param newLevel the target storage level * @param allowOverride whether to override any existing level with the new one */ private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } // If this is the first time this RDD is marked for persisting, register it // with the SparkContext for cleanups and accounting. Do this only once. if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this }
Caching does not cause any side effect for the mentioned RDD. If it is already indicated for perseverance, nothing will happen. If this is not the case, the only side effect will be to register it on SparkContext , where the side effect does not apply to the RDD itself, but to the context.
Edit:
Looking at JavaRDD.cache , it seems that the main call will cause the allocation of another JavaRDD :
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
Where wrapRDD calls JavaRDD.fromRDD :
object JavaRDD { implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd) implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd }
This will cause the allocation of a new JavaRDD . However, the internal instance of RDD[T] will remain the same.
source share