Why does collect () in a 1st row DataFrame use 2000 extents?

This is the easiest DataFrame I could think of. I am using PySpark 1.6.1.

# one row of data
rows = [ (1,   2) ]
cols = [ "a", "b" ]
df   = sqlContext.createDataFrame(rows, cols)

So, the data frame fits completely into the memory, has no links to any files and looks pretty trivial for me.

But when I collect the data, it uses 2,000 performers:

df.collect()

During the collection, 2000 performers are used:

[Stage 2:===================================================>(1985 + 15) / 2000]

and then the expected result:

[Row(a=1, b=2)]

Why is this happening? Should the DataFrame be completely in the driver memory?

+4
source share
2 answers

, , , . , sqlContext.createDataFrame .

2000 ?

Spark 2000 , 2000 . ( , , , ).

:

>>> df.rdd.getNumPartitions()
2000

DataFrame 2000 ?

, sqlContext.createDataFrame (2000 ), , .

.

sql/context.py sqlContext.createDataFrame ( ):

rdd, schema = self._createFromLocal(data, schema)

:

return self._sc.parallelize(data), schema

sqlContext.parallelize context.py:

numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism

, sqlContext.createDataFrame.

DataFrame?

DataFrame.coalesce.

>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]
+2

. , , .

conf = SparkConf()
conf.set('spark.dynamicAllocation.enabled','true')
conf.set('spark.dynamicAllocation.maxExecutors','32')
0

All Articles