Memory issue when importing parquet files into Spark

I am trying to query data from parquet files in Scala Spark (1.5), including a query of 2 million rows ("options" in the following code).

val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.sql("SET spark.sql.parquet.binaryAsString=true") val parquetFile = sqlContext.read.parquet(<path>) parquetFile.registerTempTable("tmpTable") sqlContext.cacheTable("tmpTable") val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...) val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ) 

This works fine when the number of rows selected is small, but with the error "Size exceeds Integer.MAX_VALUE" when requesting a large amount of data. The error is as follows:

 User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times, most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ... 

What can I do to make this work?

This seems like a memory problem, but I tried to use up to 100 artists without a difference (the time spent on failure remains the same regardless of the number of artists involved). It seems that the data is not separated by nodes?

I tried to force the parallelization to increase by naively replacing this line, but to no avail:

 val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10) 
+6
source share
1 answer

I do not believe that the problem is with parquet. You "click" the limit on the maximum partition size in Spark.

Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map (FileChannelImpl.java:828) at ...

Integer.MAX_VALUE found that you have a size (I think) of a partition larger than 2 GB (it requires more than indexing to index it than int32).

Joe Wyden's commentary is in place. You need to redo your data even more. Try 1000 or more.

eg.

 val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF 
+7
source

All Articles