I am using a broadcast variable of about 100 MB, which I am approximating:
>>> data = list(range(int(10*1e6))) >>> import cPickle as pickle >>> len(pickle.dumps(data)) 98888896
Starting in a cluster with 3 c3.2x large executors and the m3.large driver with the following command that starts an interactive session:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
In RDD, if I keep a reference to this broadcast variable, memory usage explodes. For 100 references to a 100 MB variable, even if it was copied 100 times, I would expect that the data usage will be no more than 10 GB (no more than 30 GB for 3 nodes). However, I see errors in memory when I run the following test:
data = list(range(int(10*1e6))) metadata = sc.broadcast(data) ids = sc.parallelize(zip(range(100), range(100))) joined_rdd = ids.mapValues(lambda _: metadata.value) joined_rdd.persist() print('count: {}'.format(joined_rdd.count()))
Stack trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func return f(iterator) File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream yield self._read_with_length(stream) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) MemoryError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
I have seen previous threads that using memory while deserializing brine is a problem. However, I would expect that the broadcast variable can only be deserialized (and loaded into memory to the executor) once, and subsequent references to .value referenced to this address in memory. However, this does not seem to be the case. Did I miss something?
The examples I saw with broadcast variables have them as dictionaries, which are used once to convert a data set (i.e. replace airport abbreviations with airport names). The motivation for their stay here is to create objects with knowledge of the broadcast variable and how to interact with it, save these objects and perform several calculations using them (with spark retention in memory).
What are some tips for using broadcast variables (100 MB +)? Is there a translation error? Is this a problem that is possibly specific to PySpark?
Thanks! Your help is appreciated.
Notice I also posted this question on the databricks forums .
Edit - next question:
It has been suggested that the Spark serializer defaults to a batch size of 65337. Objects serialized in different batches are not identified as the same and they are assigned different memory addresses, which are considered here through the built-in id function. However, even with a wider broadcast variable that theoretically takes 256 lots to serialize, I still see only two different copies. Can't I see anymore? My understanding of how batch serialization does not work correctly?
>>> sc.serializer.bestSize 65536 >>> import cPickle as pickle >>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} >>> len(pickle.dumps(broadcast_data)) 16777786 >>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize 256 >>> bd = sc.broadcast(broadcast_data) >>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) >>> rdd.map(id).distinct().count() 1 >>> rdd.cache().count() 100 >>> rdd.map(id).distinct().count() 2