Un-persisting all dataframes in (py) sparks

I am a spark point application with a few points where I would like to keep the current state. This usually happens after a big step or caching of a state that I would like to use several times. It seems that when I call the cache on my framework a second time, a new copy is cached into memory. In my application, this leads to memory problems when scaling. Despite the fact that in my current tests this data block is a maximum of about 100 MB, the total size of the intermediate results grows beyond the allocated memory of the artist. The following is a small example demonstrating this behavior.

cache_test.py:

from pyspark import SparkContext, HiveContext spark_context = SparkContext(appName='cache_test') hive_context = HiveContext(spark_context) df = (hive_context.read .format('com.databricks.spark.csv') .load('simple_data.csv') ) df.cache() df.show() df = df.withColumn('C1+C2', df['C1'] + df['C2']) df.cache() df.show() spark_context.stop() 

simple_data.csv:

 1,2,3 4,5,6 7,8,9 

When looking at the user interface of the application, there is a copy of the original data framework, in addition to the one in the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove the intermediate cache result (i.e., call a unconvincing one before each cache() ).

It is also possible to clear all cached objects. My application has natural breakpoints where I can just clear all memory and move on to the next file. I would like to do this without creating a new spark application for each input file.

Thank you in advance!

+8
python caching apache-spark pyspark apache-spark-sql
source share
3 answers

Spark 2.x

You can use Catalog.clearCache :

 from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate ... spark.catalog.clearCache() 

Spark 1.x

You can use the SQLContext.clearCache method, which

Deletes all cached tables from the cache in memory.

 from pyspark.sql import SQLContext from pyspark import SparkContext sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) ... sqlContext.clearCache() 
+11
source share

We use it quite often

 for (id, rdd) in sc._jsc.getPersistentRDDs().items(): rdd.unpersist() 
+1
source share

can individually disable all df:

 firstDF.unpersist() secondDF.unpersist() 
0
source share

All Articles