I am a spark point application with a few points where I would like to keep the current state. This usually happens after a big step or caching of a state that I would like to use several times. It seems that when I call the cache on my framework a second time, a new copy is cached into memory. In my application, this leads to memory problems when scaling. Despite the fact that in my current tests this data block is a maximum of about 100 MB, the total size of the intermediate results grows beyond the allocated memory of the artist. The following is a small example demonstrating this behavior.
cache_test.py:
from pyspark import SparkContext, HiveContext spark_context = SparkContext(appName='cache_test') hive_context = HiveContext(spark_context) df = (hive_context.read .format('com.databricks.spark.csv') .load('simple_data.csv') ) df.cache() df.show() df = df.withColumn('C1+C2', df['C1'] + df['C2']) df.cache() df.show() spark_context.stop()
simple_data.csv:
1,2,3 4,5,6 7,8,9
When looking at the user interface of the application, there is a copy of the original data framework, in addition to the one in the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove the intermediate cache result (i.e., call a unconvincing one before each cache() ).
It is also possible to clear all cached objects. My application has natural breakpoints where I can just clear all memory and move on to the next file. I would like to do this without creating a new spark application for each input file.
Thank you in advance!
python caching apache-spark pyspark apache-spark-sql
bjack3
source share