Scope "spark.driver.maxResultSize"

I am running a Spark task to aggregate data. I have a custom data structure called a profile that basically contains mutable.HashMap[Zone, Double] . I want to combine all profiles that use this key (UUID) with the following code:

 def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1} val aggregated = dailyProfiles .aggregateByKey(new Profile(), 3200)(merge, merge).cache() 

Curiously, Spark crashes with the following error:

org.apache.spark.SparkException: the operation is interrupted due to a scene disruption: the total size of serialized results from 116318 tasks (1024.0 MB) is larger than spark.driver.maxResultSize (1024.0 MB)

The obvious solution is to increase "spark.driver.maxResultSize", but two things will puzzle me.

  • Too many matches that I get 1024.0 more than 1024.0
  • All the documentation and help that I found while searching for this particular error and configuration parameter indicates that they affect functions that return a value to the driver. (say take() or collect() ), but I NEVER accept the driver, I just read from HDFS, aggregate, save back to HDFS.

Does anyone know why I am getting this error?

+7
scala apache-spark
source share
1 answer

Yes , it does not work, because the Values ​​that we see in the exception message are rounded off by one precision and the comparison takes place in bytes .

This serialized output must be more than 1024.0 MB and less than 1024.1 MB .

Checking the added Apache Spark code snippet is very interesting and very rare to get this error. :)

Here totalResultSize > maxResultSize both are long types and hold the value in bytes. But msg has a rounded value from Utils.bytesToString() .

 //TaskSetManager.scala def canFetchMoreResults(size: Long): Boolean = sched.synchronized { totalResultSize += size calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + s"(${Utils.bytesToString(maxResultSize)})" logError(msg) abort(msg) false } else { true } } 

Apache Spark 1.3 - source


 //Utils.scala def bytesToString(size: Long): String = { val TB = 1L << 40 val GB = 1L << 30 val MB = 1L << 20 val KB = 1L << 10 val (value, unit) = { if (size >= 2*TB) { (size.asInstanceOf[Double] / TB, "TB") } else if (size >= 2*GB) { (size.asInstanceOf[Double] / GB, "GB") } else if (size >= 2*MB) { (size.asInstanceOf[Double] / MB, "MB") } else if (size >= 2*KB) { (size.asInstanceOf[Double] / KB, "KB") } else { (size.asInstanceOf[Double], "B") } } "%.1f %s".formatLocal(Locale.US, value, unit) } 

Apache Spark 1.3 - source

+1
source share

All Articles