Spark: apomOpx api errors after inconvenient useless RDD

I met the Out Of Memeory error for unknown reasons, I immediately released useless RDDs, but after several rounds of the loop, the OOM error still appeared. My code is as follows:

// single source shortest path def sssp[VD](graph:Graph[VD,Double], source: VertexId): Graph[Double, Double] = { graph.mapVertices((id, _) => if (id == source) 0.0 else Double.PositiveInfinity) .pregel(Double.PositiveInfinity)( (id, dist, newDist) => scala.math.min(dist, newDist), triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) } def selectCandidate(candidates: RDD[(VertexId, (Double, Double))]): VertexId = { Random.setSeed(System.nanoTime()) val selectLow = Random.nextBoolean() val (vid, (_, _)) = if (selectLow) { println("Select lowest bound") candidates.reduce((x, y) => if (x._2._1 < y._2._1) x else y) } else { println("Select highest bound") candidates.reduce((x, y) => if (x._2._2 > y._2._2) x else y) } vid } val g = {/* load graph from hdfs*/}.partitionBy(EdgePartition2D,eParts).cache println("Vertices Size: " + g.vertices.count ) println("Edges Size: " + g.edges.count ) val resultDiameter = { val diff = 0d val maxIterations = 100 val filterJoin = 1e5 val vParts = 100 var deltaHigh = Double.PositiveInfinity var deltaLow = Double.NegativeInfinity var candidates = g.vertices.map(x => (x._1, (Double.NegativeInfinity, Double.PositiveInfinity))) .partitionBy(new HashPartitioner(vParts)) .persist(StorageLevel.MEMORY_AND_DISK) // (vid, low, high) var round = 0 var candidateCount = candidates.count while (deltaHigh - deltaLow > diff && candidateCount > 0 && round <= maxIterations) { val currentVertex = dia.selectCandidate(candidates) val dist: RDD[(VertexId, Double)] = dia.sssp(g, currentVertex) .vertices .partitionBy(new HashPartitioner(vParts)) // join more efficiently .persist(StorageLevel.MEMORY_AND_DISK) val eccentricity = dist.map({ case (vid, length) => length }).max println("Eccentricity = %.1f".format(eccentricity)) val subDist = if(candidateCount > filterJoin) { println("Directly use Dist") dist } else { // when candidates is small than filterJoin, filter the useless vertices println("Filter Dist") val candidatesMap = candidates.sparkContext.broadcast( candidates.collect.toMap) val subDist = dist.filter({case (vid, length) => candidatesMap.value.contains(vid)}) .persist(StorageLevel.MEMORY_AND_DISK) println("Sub Dist Count: " + subDist.count) subDist } var previousCandidates = candidates candidates = candidates.join(subDist).map({ case (vid, ((low, high), d)) => (vid, (Array(low, eccentricity - d, d).max, Array(high, eccentricity + d).min)) }).persist(StorageLevel.MEMORY_AND_DISK) candidateCount = candidates.count println("Candidates Count 1 : " + candidateCount) previousCandidates.unpersist(true) // release useless rdd dist.unpersist(true) // release useless rdd deltaLow = Array(deltaLow, candidates.map({ case (_, (low, _)) => low }).max).max deltaHigh = Array(deltaHigh, 2 * eccentricity, candidates.map({ case (_, (_, high)) => high }).max).min previousCandidates = candidates candidates = candidates.filter({ case (_, (low, high)) => !((high <= deltaLow && low >= deltaHigh / 2d) || low == high) }) .partitionBy(new HashPartitioner(vParts)) // join more efficiently .persist(StorageLevel.MEMORY_AND_DISK) candidateCount = candidates.count println("Candidates Count 2:" + candidateCount) previousCandidates.unpersist(true) // release useless rdd round += 1 println(s"Round=${round},Low=${deltaLow}, High=${deltaHigh}, Candidates=${candidateCount}") } deltaLow } println(s"Diameter $resultDiameter") println("Complete!") 

The main data in the while block is the graph object g and the candidate . g is used to calculate the path of a single shourtest source in each round, and the graph structure does not change. Candidates size will decrease in a circle.

In each round, I manually abandon the useless rdd in lock mode, so I think it should have enough memory for the following operations. However, he stops for OOM in round 7 or 6 at random. When the program came in round 6 or 7, the candidates seriously weakened, about 10% or less from the source. The output sample is as follows: the size of the candidates decreases from 15,288,624 in round 1 to 67,451 in round 7:

 Vertices Size: 15,288,624 Edges Size: 228,097,574 Select lowest bound Eccentricity = 12.0 Directly use Dist Candidates Count 1 : 15288624 Candidates Count 2:15288623 Round=1,Low=12.0, High=24.0, Candidates=15288623 Select lowest bound Eccentricity = 13.0 Directly use Dist Candidates Count 1 : 15288623 Candidates Count 2:15288622 Round=2,Low=13.0, High=24.0, Candidates=15288622 Select highest bound Eccentricity = 18.0 Directly use Dist Candidates Count 1 : 15288622 Candidates Count 2:6578370 Round=3,Low=18.0, High=23.0, Candidates=6578370 Select lowest bound Eccentricity = 12.0 Directly use Dist Candidates Count 1 : 6578370 Candidates Count 2:6504563 Round=4,Low=18.0, High=23.0, Candidates=6504563 Select lowest bound Eccentricity = 11.0 Directly use Dist Candidates Count 1 : 6504563 Candidates Count 2:412789 Round=5,Low=18.0, High=22.0, Candidates=412789 Select highest bound Eccentricity = 17.0 Directly use Dist Candidates Count 1 : 412789 Candidates Count 2:288670 Round=6,Low=18.0, High=22.0, Candidates=288670 Select highest bound Eccentricity = 18.0 Directly use Dist Candidates Count 1 : 288670 Candidates Count 2:67451 Round=7,Low=18.0, High=22.0, Candidates=67451 

Nearest ends spark.info log

 6/12/12 14:03:09 WARN YarnAllocator: Expected to find pending requests, but found none. 16/12/12 14:06:21 INFO YarnAllocator: Canceling requests for 0 executor containers 16/12/12 14:06:33 WARN YarnAllocator: Expected to find pending requests, but found none. 16/12/12 14:14:26 WARN NioEventLoop: Unexpected exception in the selector loop. java.lang.OutOfMemoryError: Java heap space 16/12/12 14:18:14 WARN NioEventLoop: Unexpected exception in the selector loop. java.lang.OutOfMemoryError: Java heap space at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:123) at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:218) at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:260) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:347) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at java.lang.Thread.run(Thread.java:744) 16/12/12 14:18:14 WARN DFSClient: DFSOutputStream ResponseProcessor exception for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721) 16/12/12 14:14:39 WARN AbstractConnector: java.lang.OutOfMemoryError: Java heap space at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:233) at org.spark-project.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109) at org.spark-project.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938) at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:744) 16/12/12 14:20:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 1 time(s) from Reporter thread.) 16/12/12 14:19:38 WARN DFSClient: Error Recovery for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272 in pipeline 100.76.15.28:9003, 100.76.48.218:9003, 100.76.48.199:9003: bad datanode 100.76.15.28:9003 16/12/12 14:18:58 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 16/12/12 14:20:49 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-198] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space 16/12/12 14:20:49 INFO SparkContext: Invoking stop() from shutdown hook 16/12/12 14:20:49 INFO ContextCleaner: Cleaned shuffle 446 16/12/12 14:20:49 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270) at scala.concurrent.Future$class.recover(Future.scala:324) at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:376) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:104) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1630) at org.apache.spark.ContextCleaner.doCleanupRDD(ContextCleaner.scala:208) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:364) ... 12 more 16/12/12 14:20:49 WARN QueuedThreadPool: 5 threads could not be stopped 16/12/12 14:20:49 INFO SparkUI: Stopped Spark web UI at http://10.215.154.152:56338 16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/12/12 14:21:04 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 2 attempts org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 

Gc.log coming up

 2016-12-12T14:10:43.541+0800: 16832.953: [Full GC 2971008K->2971007K(2971008K), 11.4284920 secs] 2016-12-12T14:10:54.990+0800: 16844.403: [Full GC 2971007K->2971007K(2971008K), 11.4479110 secs] 2016-12-12T14:11:06.457+0800: 16855.870: [GC 2971007K(2971008K), 0.6827710 secs] 2016-12-12T14:11:08.825+0800: 16858.237: [Full GC 2971007K->2971007K(2971008K), 11.5480350 secs] 2016-12-12T14:11:20.384+0800: 16869.796: [Full GC 2971007K->2971007K(2971008K), 11.0481490 secs] 2016-12-12T14:11:31.442+0800: 16880.855: [Full GC 2971007K->2971007K(2971008K), 11.0184790 secs] 2016-12-12T14:11:42.472+0800: 16891.884: [Full GC 2971008K->2971008K(2971008K), 11.3124900 secs] 2016-12-12T14:11:53.795+0800: 16903.207: [Full GC 2971008K->2971008K(2971008K), 10.9517160 secs] 2016-12-12T14:12:04.760+0800: 16914.172: [Full GC 2971008K->2971007K(2971008K), 11.0969500 secs] 2016-12-12T14:12:15.868+0800: 16925.281: [Full GC 2971008K->2971008K(2971008K), 11.1244090 secs] 2016-12-12T14:12:27.003+0800: 16936.416: [Full GC 2971008K->2971008K(2971008K), 11.0206800 secs] 2016-12-12T14:12:38.035+0800: 16947.448: [Full GC 2971008K->2971008K(2971008K), 11.0024270 secs] 2016-12-12T14:12:49.048+0800: 16958.461: [Full GC 2971008K->2971008K(2971008K), 10.9831440 secs] 2016-12-12T14:13:00.042+0800: 16969.454: [GC 2971008K(2971008K), 0.7338780 secs] 2016-12-12T14:13:02.496+0800: 16971.908: [Full GC 2971008K->2971007K(2971008K), 11.1536860 secs] 2016-12-12T14:13:13.661+0800: 16983.074: [Full GC 2971007K->2971007K(2971008K), 10.9956150 secs] 2016-12-12T14:13:24.667+0800: 16994.080: [Full GC 2971007K->2971007K(2971008K), 11.0139660 secs] 2016-12-12T14:13:35.691+0800: 17005.104: [GC 2971007K(2971008K), 0.6693770 secs] 2016-12-12T14:13:38.115+0800: 17007.527: [Full GC 2971007K->2971006K(2971008K), 11.0514040 secs] 2016-12-12T14:13:49.178+0800: 17018.590: [Full GC 2971007K->2971007K(2971008K), 10.8881160 secs] 2016-12-12T14:14:00.076+0800: 17029.489: [GC 2971007K(2971008K), 0.7046370 secs] 2016-12-12T14:14:02.498+0800: 17031.910: [Full GC 2971007K->2971007K(2971008K), 11.3424300 secs] 2016-12-12T14:14:13.862+0800: 17043.274: [Full GC 2971008K->2971006K(2971008K), 11.6215890 secs] 2016-12-12T14:14:25.503+0800: 17054.915: [GC 2971006K(2971008K), 0.7196840 secs] 2016-12-12T14:14:27.857+0800: 17057.270: [Full GC 2971008K->2971007K(2971008K), 11.3879990 secs] 2016-12-12T14:14:39.266+0800: 17068.678: [Full GC 2971007K->2971007K(2971008K), 11.1611420 secs] 2016-12-12T14:14:50.446+0800: 17079.859: [GC 2971007K(2971008K), 0.6976180 secs] 2016-12-12T14:14:52.782+0800: 17082.195: [Full GC 2971007K->2971007K(2971008K), 11.4318900 secs] 2016-12-12T14:15:04.235+0800: 17093.648: [Full GC 2971007K->2971007K(2971008K), 11.3429010 secs] 2016-12-12T14:15:15.598+0800: 17105.010: [GC 2971007K(2971008K), 0.6832320 secs] 2016-12-12T14:15:17.930+0800: 17107.343: [Full GC 2971008K->2971007K(2971008K), 11.1898520 secs] 2016-12-12T14:15:29.131+0800: 17118.544: [Full GC 2971007K->2971007K(2971008K), 10.9680150 secs] 2016-12-12T14:15:40.110+0800: 17129.522: [GC 2971007K(2971008K), 0.7444890 secs] 2016-12-12T14:15:42.508+0800: 17131.920: [Full GC 2971007K->2971007K(2971008K), 11.3052160 secs] 2016-12-12T14:15:53.824+0800: 17143.237: [Full GC 2971007K->2971007K(2971008K), 10.9484100 secs] 2016-12-12T14:16:04.783+0800: 17154.196: [Full GC 2971007K->2971007K(2971008K), 10.9543950 secs] 2016-12-12T14:16:15.748+0800: 17165.160: [GC 2971007K(2971008K), 0.7066150 secs] 2016-12-12T14:16:18.176+0800: 17167.588: [Full GC 2971007K->2971007K(2971008K), 11.1201370 secs] 2016-12-12T14:16:29.307+0800: 17178.719: [Full GC 2971007K->2971007K(2971008K), 11.0746950 secs] 2016-12-12T14:16:40.392+0800: 17189.805: [Full GC 2971007K->2971007K(2971008K), 11.0036170 secs] 2016-12-12T14:16:51.407+0800: 17200.819: [Full GC 2971007K->2971007K(2971008K), 10.9655670 secs] 2016-12-12T14:17:02.383+0800: 17211.796: [Full GC 2971007K->2971007K(2971008K), 10.7348560 secs] 2016-12-12T14:17:13.128+0800: 17222.540: [GC 2971007K(2971008K), 0.6679470 secs] 2016-12-12T14:17:15.450+0800: 17224.862: [Full GC 2971007K->2971007K(2971008K), 10.6219270 secs] 2016-12-12T14:17:26.081+0800: 17235.494: [Full GC 2971007K->2971007K(2971008K), 10.9158450 secs] 2016-12-12T14:17:37.016+0800: 17246.428: [Full GC 2971007K->2971007K(2971008K), 11.3107490 secs] 2016-12-12T14:17:48.337+0800: 17257.750: [Full GC 2971007K->2971007K(2971008K), 11.0769460 secs] 2016-12-12T14:17:59.424+0800: 17268.836: [GC 2971007K(2971008K), 0.6707600 secs] 2016-12-12T14:18:01.850+0800: 17271.262: [Full GC 2971007K->2970782K(2971008K), 12.6348300 secs] 2016-12-12T14:18:14.496+0800: 17283.909: [GC 2970941K(2971008K), 0.7525790 secs] 2016-12-12T14:18:16.890+0800: 17286.303: [Full GC 2971006K->2970786K(2971008K), 13.1047470 secs] 2016-12-12T14:18:30.008+0800: 17299.421: [GC 2970836K(2971008K), 0.8139710 secs] 2016-12-12T14:18:32.458+0800: 17301.870: [Full GC 2971005K->2970873K(2971008K), 13.0410540 secs] 2016-12-12T14:18:45.512+0800: 17314.925: [Full GC 2971007K->2970893K(2971008K), 12.7169690 secs] 2016-12-12T14:18:58.239+0800: 17327.652: [GC 2970910K(2971008K), 0.7314350 secs] 2016-12-12T14:19:00.557+0800: 17329.969: [Full GC 2971008K->2970883K(2971008K), 11.1889000 secs] 2016-12-12T14:19:11.767+0800: 17341.180: [Full GC 2971006K->2970940K(2971008K), 11.4069700 secs] 2016-12-12T14:19:23.185+0800: 17352.597: [GC 2970950K(2971008K), 0.6689360 secs] 2016-12-12T14:19:25.484+0800: 17354.896: [Full GC 2971007K->2970913K(2971008K), 12.6980050 secs] 2016-12-12T14:19:38.194+0800: 17367.607: [Full GC 2971004K->2970902K(2971008K), 12.7641130 secs] 2016-12-12T14:19:50.968+0800: 17380.380: [GC 2970921K(2971008K), 0.6966130 secs] 2016-12-12T14:19:53.266+0800: 17382.678: [Full GC 2971007K->2970875K(2971008K), 12.9416660 secs] 2016-12-12T14:20:06.233+0800: 17395.645: [Full GC 2971007K->2970867K(2971008K), 13.2740780 secs] 2016-12-12T14:20:19.527+0800: 17408.939: [GC 2970881K(2971008K), 0.7696770 secs] 2016-12-12T14:20:22.024+0800: 17411.436: [Full GC 2971007K->2970886K(2971008K), 13.8729770 secs] 2016-12-12T14:20:35.919+0800: 17425.331: [Full GC 2971002K->2915146K(2971008K), 12.8270160 secs] 2016-12-12T14:20:48.762+0800: 17438.175: [GC 2915155K(2971008K), 0.6856650 secs] 2016-12-12T14:20:51.271+0800: 17440.684: [Full GC 2971007K->2915307K(2971008K), 12.4895750 secs] 2016-12-12T14:21:03.771+0800: 17453.184: [GC 2915320K(2971008K), 0.6249910 secs] 2016-12-12T14:21:06.377+0800: 17455.789: [Full GC 2971007K->2914274K(2971008K), 12.6835220 secs] 2016-12-12T14:21:19.129+0800: 17468.541: [GC 2917963K(2971008K), 0.6917090 secs] 2016-12-12T14:21:21.526+0800: 17470.938: [Full GC 2971007K->2913949K(2971008K), 13.0442320 secs] 2016-12-12T14:21:36.588+0800: 17486.000: [GC 2936827K(2971008K), 0.7244690 secs] 

Thus, the logs show that there may be a memory leak , this can happen in two places: 1) my code or 2) the code in spark graphx api

Can someone help me find out the reason if this happens in my code?

+7
out-of-memory apache-spark spark-graphx
source share
2 answers

I do not think the unpersist() API is causing out of memory. OutOfMemory called by the collect() API because collect() (which is Action , unlike Transformation ) retrieves the entire RDD on a single driver.

A few suggestions:

  • Increasing the RAM in the driver memory is one partial solution that you have already implemented. If you are working with jdk 8, use the G1GC team to manage large heaps.

  • You can play with storage levels ( MEMORY_AND_DISK , OFF_HEAP , etc.) to fine tune it for your application.

Take a look at this official guide documentation for more details.

+2
source share

I didn’t completely solve the problem, but partially fixed it,

  • Increase driver memory. I mentioned above that he stopped in round 6 or 7, but when I double the driver memory, he will stop in round 14. So, I think OOM memory for drivers may be one of the reasons.
  • Save the RDD candidates to HDFS and continue the process next time. Thus, a preliminary assessment will not be wasted.
  • Serialize RDD candidates with Kryo . It will cost some computation when decoding and encoding, but it saves memory.

Not an ideal solution, but it works in my case. However, I hope the other guys give the perfect one.

0
source share

All Articles