The corrected application does not recover when the executor loses it

I am running Spark in a standalone cluster. The python driver application is on the same node as Master and 2 work nodes. Business logic is python code that is run by executors created on Worker nodes.

I end the situation when nothing happens if one of the performers dies. If I forcefully destroy one of the backend processes on Worker 0, the Master outputs:

16/06/07 16:20:35 ERROR TaskSchedulerImpl: Lost executor 1 on sparkslave0: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 16/06/07 16:20:35 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, sparkslave0): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 16/06/07 16:20:35 INFO DAGScheduler: Executor lost: 1 (epoch 0) 16/06/07 16:20:35 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/1 is now EXITED (Command exited with code 137) 16/06/07 16:20:35 INFO SparkDeploySchedulerBackend: Executor app-20160607161937-0010/1 removed: Command exited with code 137 

then, after a timeout:

 16/06/07 16:22:35 WARN NettyRpcEndpointRef: Error sending message [message = RemoveExecutor(1)] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:225) at org.apache.spark.storage.BlockManagerMaster.removeExecutor(BlockManagerMaster.scala:40) at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1321) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1628) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ... 9 more 

I see that the Worker daemon created another artist, but nothing happens. After 3 attempts, I see

 16/06/07 16:26:41 ERROR Inbox: Ignoring error org.apache.spark.SparkException: Error notifying standalone scheduler driver endpoint at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:362) at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:144) at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(AppClient.scala:186) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Error sending message [message = RemoveExecutor(1,Command exited with code 137)] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:118) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:359) ... 9 more Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ... 3 more Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) ... 7 more 16/06/07 16:26:41 ERROR DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext org.apache.spark.SparkException: Error sending message [message = RemoveExecutor(1)] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:118) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77) at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:225) at org.apache.spark.storage.BlockManagerMaster.removeExecutor(BlockManagerMaster.scala:40) at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1321) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1628) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101) ... 8 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ... 9 more 16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor added: app-20160607161937-0010/2 on worker-20160607145945-192.168.33.100-34250 (192.168.33.100:34250) with 1 cores 16/06/07 16:26:41 INFO SparkDeploySchedulerBackend: Granted executor ID app-20160607161937-0010/2 on hostPort 192.168.33.100:34250 with 1 cores, 1222.0 MB RAM 16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/2 is now RUNNING 16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/2 is now EXITED (Command exited with code 1) 16/06/07 16:26:41 INFO SparkDeploySchedulerBackend: Executor app-20160607161937-0010/2 removed: Command exited with code 1 16/06/07 16:26:41 INFO TaskSchedulerImpl: Cancelling stage 0 16/06/07 16:26:41 INFO TaskSchedulerImpl: Stage 0 was cancelled 

and the application does not work.

Can anyone give any recommendations for the restore gracefully and allow the application to continue working with the remaining and / or new artist?

Many thanks,

Dave

+7
apache-spark pyspark
source share

No one has answered this question yet.

See related questions:

2
org.apache.spark.SparkException: The operation is interrupted due to a stage failure using yarn and dockers
2
Exception while reading a text file in cluster mode
one
Slave error in pyspark
one
A lost performer on a simple sparksql join request
one
Spark sortby exception
0
intermittent problems with saveAsTextFile when starting ETL spark
0
Python bug pandas_udf fixed
0
Spark VectorAssembler bug - PySpark 2.3 - Python
0
Heartbeat timed Spark to DataProc
0
When I run two spark applications, does it show an error?

All Articles