ExecutorLostFailure error when starting a task in Spark

when I try to run it in this folder, it throws me ExecutorLostFailure every time

Hi, I'm new to Spark. I am trying to start work on Spark 1.4.1 with 8 slaves with 11.7 GB of memory on each 3.2 GB disk. I run the Spark task from one of the slave node (of 8 nodes) (therefore, with 0.7 storage shares about 4.8 GB is available only on each node) and using Mesos as the Manager cluster. I am using this configuration:

spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050 spark.eventLog.enabled true spark.driver.memory 6g spark.storage.memoryFraction 0.7 spark.core.connection.ack.wait.timeout 800 spark.akka.frameSize 50 spark.rdd.compress true 

I am trying to run Spark MLlib Naive Bayes Algorithm in a folder with about 14 GB of data. (There is no problem when starting a task in a folder with 6 GB). I read this folder from Google repository as RDD and give 32 as the section parameter. (I also tried to enlarge the section). Then use TF to create a feature vector and predict based on this. But when I try to run it in this folder, it throws me ExecutorLostFailure every time . I tried different configurations but nothing helps. Maybe I am missing something very simple, but I can’t understand. Any help or suggestion would be greatly appreciated.

Magazine:

  15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job 15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2 15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled 15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s 15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3) 15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster. 15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s Traceback (most recent call last): File "/opt/work/V2ProcessRecords.py", line 213, in <module> secondPassRDD = firstPassRDD.map(lambda ( name, title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): ( str(name), title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor 15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook {"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":6,"Attempt":2,"Launch Time":1437616381852,"Executor ID":"20150526-135628-3255597322-5050-1304-S8","Host":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}} 

{"Event": "SparkListenerExecutorRemoved", "Timestamp": 1437616389696, "Artist ID": "20150526-135628-3255597322-5050-1304-S8", "Caused": "Lost artist"} {"Event": " SparkListenerTaskEnd "," Stage ID ": 2," Stage ID ": 0," Task Type ":" ResultTask "," Reason for completing the task ": {" Reason ":" ExecutorLostFailure "," Executor "ID": "20150526- 135628-3255597322-5050-1304-S8 "}," Task Information ": {" Task ID ": 11," Index ": 6," Attempt ": 2," Startup Time ": 1437616381852," Executor ID ": "20150526-135628-3255597322-5050-1304-S8", "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal", "Locality": PROCESS_LOCAL "," Speculative th: "false," Get runtime ": 0," End time ": 1437616389697," Failure ": true," Accumulables ": []}} {" Event ":" SparkListenerExecutorAdded "," Timestamp ": 1437616389707," Artist ID ":" 20150526-135628-3255597322-5050-1304-S8 "," Artist Information ": {" Host ":" uc1f-bioinfocloud-vamp -m-2.c.quantum-device-854.internal " , "Total Cores": 1, "Log Urls": {}}} {"Event": "SparkListenerTaskStart", "Stage ID": 2, "Stage ID": 0, "Task Information": {"Task ID ": 12," Index ": 6," Attempt ": 3, Start time": 1437616389702, "Artist ID": "20150526-135628-3255597322-5050-1304-S8", "Host": "uc1f-bioinfocloud- vamp-m-2.c.quantum-device-854.internal "," Local "": "PROCESS_LOCAL", "Speculative": false, "Get runtime": 0, "End time": 0, "Failure": false, "Accumulables": []}} {"Event": "SparkListenerExecutorRemoved" , "Timestamp": 1437616397743, "Artist ID": "20150526-135628-3255597322-5050-1304-S8", "Reason removed": "Lost artist"} {"Event": "SparkListenerTaskEnd", "Stage ID": 2, "Stage ID": 0, "Task Type": "ResultTask", "Reason for completing the task": {"Reason": "ExecutorLostFailure", "Executor" ID ":" 20150526-135628-3255597322-5050-1304- S8 "}," Task Information ": {" Task ID ": 12," Index ": 6," Attempt ": 3," Startup Time ": 1437616389702," Identity Artist ikator ":" 20150526-135628-3255597322-5050-1304-S8 "," Host ":" uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal "," Locality ": PROCESS_LOCAL" , "Speculative": false, "Get runtime": 0, "Finish Time": 1437616397743, "Failed": true, "Accumulables": []}} {"Event": "SparkListenerStageCompleted", "Step information" : {"Stage identifier": 2, "Scene identifier": 0, "Scene name": "collect in / opt / work / V 2ProcessRecords.py:215", "Number of tasks": 72, "RDD information": [{"RDD ID": 6, "Name": "PythonRDD", "Parent Identifiers": [0], "Storage Level": {"Use Disk": false, "Use Memory": false, "Use ExternalBlockStore" : fal se, "Deserialized": false, "Replication": 1}, "Number of Partitions": 72, "Number of cached partitions": 0, "Memory size": 0, "ExternalBlockStore Size": 0, "Disk size" : 0}, {"ID RDD": 0, "Name": "gs: // uc1f-bioinfocloud-vamp-m / literature / xml / P * / *. Nxml", "Scope": "{\" id \ ": \" 0 \ ", \" name \ ": \" wholeTextFiles \ "}", "Parent IDs": [], "Storage Level": {"Use disk": false, "Use memory": false , "Use ExternalBlockStore": false, "Deserialized": false, "Replication": 1}, "Number of Partitions": 72, "Number of Cached Partitions": 0, "Memory size": 0, "External blocking block size ": 0," Disk size ": 0}]," Parent identifiers ": []," Details ":" "," Send time ": 1437616365566," Completion time ": 1437616397753 , “Reason for failure”: “Operation was interrupted due to a failure of the stage: Tas k 6 at stage 2.0 failed 4 times, last failure: lost task 6.3 at stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c .quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628 -3255597322-5050-1304-S8 lost) \ nPerfect stacktrace: "," Accumulables ": []}} {" Event ":" SparkListenerJobEnd "," Job ID ": 2," End time ": 1437616397755," Job Result ": {" Result ":" JobFailed "," Exception ": {" Message ":" Job was interrupted due to failure: task 6 in step 2.0 failed 4 times, last crash: 6.3 lost task in step 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost) \ nStacktrace: "," Stack door Trace ": [{" Class declaration ":" org.apache.spark.scheduler.DAGScheduler "," Method name ":" org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages "," File name ":" DAGScheduler. scala "," Line number ": 1266}, {" Class declaration ":" org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1 "," Method name ":" apply "," File name ":" DAGScheduler.scala "," Line number ": 1257}, {" Class declaration ":" org.apache.spark.scheduler. DAGScheduler $$ anonfun $ abortStage $ 1 "," Method name ":" apply "," File name ":" DAGScheduler.scala "," Line number ": 1256}, {" Class declaration ":" scala.collection. mutable.Resizable Array $ class "," Method name ":" foreach "," File name ":" ResizableArray.scala "," Line number ": 59}, {" Class declaration ":" scala.collection.mutable.ArrayBuffer "," Method name ":" foreach "," File name ":" ArrayBuffer.scala "," Line number ": 47}, {" Class declaration ":" org.apache.spark.scheduler.DAGScheduler "," Name Method ":" abortStage "," File name ":" DAGScheduler.scala ", Line number": 1256}, {"Class declaration": "org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1", " Method name ":" apply "," File Name ":" DAGScheduler.scala "," Line number ": 730}, {" Class declaration ":" org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1 " , "Method name": "apply", "File name": "DAGScheduler. Scala", "Page number oki ": 730}, {" Class declaration ":" scala.Option "," Method name ":" foreach "," File name ":" Option.scala "," Line number ": 236}, {" Class declaration ":" org.apache.spark.scheduler.DAGScheduler "," Method name ":" handleTaskSetFailed "," File name ":" DAGScheduler.scala "," Line number ": 730}, {" Class declaration ":" org .apache.spark.scheduler.DAGSchedulerEventProcessLoop "," Method name ":" onReceive "," File name ":" DAGScheduler.scala "," Line number ": 1450}, {" Class declaration ",:" org.apache. spark.scheduler.DAGSchedulerEventProcessLoop "," Method name ":" o nReceive "," File name ":" DAGScheduler.scala "," Line number ": 1411}, {" Class declaration ":" org.apache.spark.util .EventLoop $$ anon $ 1 "," Method name ":" run "," File Name ":" EventLoop.scala "," Article number rock ": 48}]}}}

+7
collect apache-spark pyspark apache-spark-mllib
source share
4 answers

This error occurs due to a task failure more than four times. Try increasing parallelism in your cluster using the following option.

 --conf "spark.default.parallelism=100" 

Set the parallelism value to 2 - 3 times the number of cores available in your cluster. If that doesn't work. try increasing parallelism exponentially. If your current parallelism is not working, multiply it by two and so on. I also noticed that this helps if your level of parallelism is prime, especially if you use groupByKkey.

+3
source share

It is difficult to say what the problem is, without the failed executor’s log, and not with the driver, but most likely it is a memory problem. Try to significantly increase the partition number (if your current 32 attempts are 200)

+2
source share

I had this problem and the problem for me was the very high frequency of one key in the reduceByKey task. This was (I think) forcing a massive list to be collected from one of the artists, which then threw OOM errors.

The solution for me was to simply filter out keys with a high population before doing reduceByKey , but I understand that this may or may not be possible depending on your application. I don’t need all my data anyway.

+2
source share

The most common reason ExecutorLostFailure in my understanding is the OOM in the executor.

To solve the OOM problem, you need to find out what exactly causes it. A simple increase in default parallelism or an increase in performer memory is not a strategic decision.

If you look at what parallelism increases, it tries to create more artists so that each artist can work with less and less data. But if your data is distorted so that the key on which the data is split (for parallelism) contains more data, simply increasing the parallelism will not have any effect.

Similarly, by increasing the amount of memory Executor will be a very inefficient way of conveying such a scenario, as if only one executor had failed with ExecutorLostFailure, requesting more memory for all executors would make your application require much more memory than expected.

+1
source share

All Articles