Apache-spark: rdd.unpersist crashes for large files

My code runs on EMR, Spark version 2.0.2. It works great for smaller files, but often leads to crashes in files larger than 15 GB. The accident occurs in a non-Persian function, which, incidentally, is the last stage of processing.

Any ideas would be very helpful. Thanks!

17/05/06 23:46:01 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.0.2.149:56200 is closed 17/05/06 23:46:01 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. 17/05/06 23:46:01 INFO DAGScheduler: Executor lost: 7 (epoch 5) 17/05/06 23:46:01 INFO BlockManagerMasterEndpoint: Trying to remove executor 7 from BlockManagerMaster. 17/05/06 23:46:01 WARN BlockManagerMaster: Failed to remove RDD 43 - Connection reset by peer java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) 17/05/06 23:46:01 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(7, ip-10-0-2-149.eu-west-1.compute.internal, 36043) 17/05/06 23:46:01 INFO BlockManagerMaster: Removed 7 successfully in removeExecutor 17/05/06 23:46:01 INFO YarnScheduler: Executor 7 on ip-10-0-2-149.eu-west-1.compute.internal killed by driver. Traceback (most recent call last): File "/mnt/update_hid.py", line 565, in <module> process(current_date) File "/mnt/update_hid.py", line 517, in process get_missing_ip=get_missing_ip) File "/mnt/update_hid.py", line 466, in add_migration_info 17/05/06 23:46:01 INFO ExecutorAllocationManager: Existing executor 7 has been removed (new total is 11) Hid.unpersist() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 251, in unpersist File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o205.unpersist. : org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:117) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1683) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:212) at org.apache.spark.api.java.JavaRDD.unpersist(JavaRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more 17/05/06 23:46:02 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 9 17/05/06 23:46:02 INFO ExecutorAllocationManager: Removing executor 9 because it has been idle for 60 seconds (new desired total will be 10) 17/05/06 23:46:02 INFO SparkContext: Invoking stop() from shutdown hook 17/05/06 23:46:02 INFO SparkUI: Stopped Spark web UI at http://10.0.2.182:4040 17/05/06 23:46:02 INFO YarnClientSchedulerBackend: Interrupting monitor thread 17/05/06 23:46:02 INFO YarnClientSchedulerBackend: Shutting down all executors 17/05/06 23:46:02 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down 17/05/06 23:46:02 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 17/05/06 23:46:02 INFO YarnClientSchedulerBackend: Stopped 17/05/06 23:46:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/05/06 23:46:02 INFO MemoryStore: MemoryStore cleared 17/05/06 23:46:02 INFO BlockManager: BlockManager stopped 17/05/06 23:46:02 INFO BlockManagerMaster: BlockManagerMaster stopped 17/05/06 23:46:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/05/06 23:46:02 INFO SparkContext: Successfully stopped SparkContext 17/05/06 23:46:02 INFO ShutdownHookManager: Shutdown hook called 
+5
source share

All Articles