I am trying to query data from parquet files in Scala Spark (1.5), including a query of 2 million rows ("options" in the following code).
val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.sql("SET spark.sql.parquet.binaryAsString=true") val parquetFile = sqlContext.read.parquet(<path>) parquetFile.registerTempTable("tmpTable") sqlContext.cacheTable("tmpTable") val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...) val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )
This works fine when the number of rows selected is small, but with the error "Size exceeds Integer.MAX_VALUE" when requesting a large amount of data. The error is as follows:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times, most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ...
What can I do to make this work?
This seems like a memory problem, but I tried to use up to 100 artists without a difference (the time spent on failure remains the same regardless of the number of artists involved). It seems that the data is not separated by nodes?
I tried to force the parallelization to increase by naively replacing this line, but to no avail:
val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)
source share