SparkError: The total size of serialized XXXX task results (2.0 GB) is larger than spark.driver.maxResultSize (2.0 GB)

Mistake:

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB) 

Purpose: to get a recommendation for all users using the model, and to overlay each user's data on them and form an overlap coefficient.

I created a recommendation model using the mllib spark. I estimate the match ratio of test data per user and recommended items for each user and generates an average overlap ratio.

  def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = { val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey val n = testData.count val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20) .mapValues(_.map(r => r.product)) val overlaps = testData.join(recommendations).map(x => { val moviesPerUserInRecs = x._2._2.toSet val moviesPerUserInTest = x._2._1.toSet val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest) if(localHitRatio.size > 0) 1 else 0 }).filter(x => x != 0).count var r = 0.0 if (overlaps != 0) r = overlaps / n return r } 

But the problem here is that it ends up throwing a maxResultSize error. In my ignition configuration, I did the following to increase maxResultSize .

 val conf = new SparkConf() conf.set("spark.driver.maxResultSize", "6g") 

But this did not solve the problem, I almost came close to the fact that I allocated the driver memory, but the problem was not solved. While the code is running, I kept looking at my spark work, and what I saw was a little puzzled.

 [Stage 281:==> (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB) 

In the step above the code, the MatrixFactorization code in spark-mllib recommendForAll executed around line 277 (not exactly the line number).

  private def recommendForAll( rank: Int, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(rank, srcFeatures) val dstBlocks = blockify(rank, dstFeatures) val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case ((srcIds, srcFactors), (dstIds, dstFactors)) => val m = srcIds.length val n = dstIds.length val ratings = srcFactors.transpose.multiply(dstFactors) val output = new Array[(Int, (Int, Double))](m * n) var k = 0 ratings.foreachActive { (i, j, r) => output(k) = (srcIds(i), (dstIds(j), r)) k += 1 } output.toSeq } ratings.topByKey(num)(Ordering.by(_._2)) } 

recommendForAll method is called from the recommendProductsForUsers method.

But it seems like the method discards 1M tasks. The data that gets the power comes from 2,000 part files, so I'm confused how it started to spit 1M tasks, and I think this might be a problem.

My question is how can I solve this problem. Without using this approach, it is very difficult to calculate the overlap ratio or recall@K . It's on spark 1.5 (cloudera 5.5)

+7
scala apache-spark apache-spark-mllib
source share
1 answer

2GB issue is not new to the Spark community: https://issues.apache.org/jira/browse/SPARK-6235

Re / partition size is more than 2 GB, try redistributing ( myRdd.repartition(parallelism) ) your RDD to more partitions (w / r / t / current level of parallelism), thereby reducing the size of each individual partition.

Re / number of tasks related to creating partitions (therefore partitions are created), my hypothesis is that it can exit the srcBlocks.cartesian(dstBlocks) API call, which creates an output RDD made from (z = srcBlocks number of partitions * dstBlocks number of sections).

In this case, you can use the myRdd.coalesce(parallelism) API instead of repartition to avoid accidental playback (and issues with serializing partitions).

0
source share

All Articles