Spark: PageRank example when an iteration too large raises a stackoverflowError

I test the default PageRank curvature example and set the iteration to 1024, then it throws a stackoverflowerror. I also met this problem in my other program. How can I solve it.

object SparkPageRank { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: PageRank <master> <file> <number_of_iterations>") System.exit(1) } var iters = args(2).toInt val ctx = new SparkContext(args(0), "PageRank",System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) val lines = ctx.textFile(args(1), 1) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => val size = urls.size urls.map(url => (url, rank / size)) } ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) } val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) System.exit(0) } } 

I post an error here.

  [spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed, shutting down system java.lang.StackOverflowError at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) at scala.collection.mutable.HashSet.contains(HashSet.scala:58) at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) at scala.collection.mutable.AbstractSet.apply(Set.scala:45) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326) 
+8
iteration scala stack-overflow apache-spark
source share
2 answers

This is because these for-loop conversions produce very long dependencies in your rdd. When you try to run your spark job, a recursive visit on your rdd will throw a stackoverflow error.

To solve this problem, you can use checkpoint() on your rdd. cache() will not help you evaluate your rdd right away.

So, you have to call cache() and checkpoint() on the intermediate rdd after certain iterations and manually evaluate it to clear its dependencies.

+3
source share

I assume the error occurs because intermediate RDDs are not evaluated before collect() . And after collection, they are evaluated recursively.

Try adding cache() to evaluate the RDD at each iteration, this will probably help:

 ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _).cache 
0
source share

All Articles