Sections not trimmed in simple SparkSQL queries

I am trying to efficiently select individual sections from a SparkSQL table (parquet in S3). However, I see evidence that Spark opens all the parquet files in the table, and not just those that pass the filter. This makes even small queries expensive for tables with many partitions.

Here is an illustrative example. I created a simple partitioned table on S3 using SparkSQL and the hive metastability:

# Make some data df = pandas.DataFrame({'pk': ['a']*5+['b']*5+['c']*5, 'k': ['a', 'e', 'i', 'o', 'u']*3, 'v': range(15)}) # Convert to a SparkSQL DataFrame sdf = hiveContext.createDataFrame(df) # And save it sdf.write.partitionBy('pk').saveAsTable('dataset', format='parquet', path='s3a://bucket/dataset') 

In the next session, I want to select a subset of this table:

 dataset = hiveContext.table('dataset') filtered_dataset = dataset.filter(dataset.pk == 'b') print filtered_dataset.toPandas() 

In the subsequent printable magazines, I see that cropping occurs:

 15/07/05 02:39:39 INFO DataSourceStrategy: Selected 1 partitions out of 3, pruned -200.0% partitions. 

But then I see that the parquet files open from all sections:

 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset 508 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 508 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset 509 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 509 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset 262 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 262 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset 509 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 509 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-00001.gz.parquet to seek to new offset -365 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-00001.gz.parquet at pos 152 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-00001.gz.parquet to seek to new offset -365 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-00001.gz.parquet at pos 151 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/_common_metadata to seek to new offset -266 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/_common_metadata at pos 4 15/07/05 02:39:39 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-00001.gz.parquet to seek to new offset -365 15/07/05 02:39:39 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-00001.gz.parquet at pos 152 

Only with three sections is not a problem, but with thousands, it causes a noticeable delay. Why do all these irrelevant files open?

+6
source share
1 answer

Take a look at spark.sql.parquet.filterPushdown , which is set to false by default due to some bugs present in the version of Parquet used by Spark. It can be used in 1.3 / 1.4, check the official documentation .

I think this is fixed in Spark 1.5.

+2
source

All Articles