Spark: unable to start terasor when increasing data volume

I have a spark bench that includes a terasor, and it works correctly when the data is only a few hundred GB, but when I generate more data, such as 1 TB, at some point this went wrong. Below is my code:

import org.apache.spark.rdd._
import org.apache.spark._
import org.apache.spark.SparkContext._


object ScalaTeraSort{

  def main(args: Array[String]){
    if (args.length < 2){
      System.err.println(
        s"Usage: $ScalaTeraSort <INPUT_HDFS> <OUTPUT_HDFS>"
      )
      System.exit(1)
    }
    val sparkConf = new SparkConf().setAppName("ScalaTeraSort")
    val sc = new SparkContext(sparkConf)

    val file = sc.textFile(args(0))
    val data = file.map(line => (line.substring(0, 10), line.substring(10)))
                     .sortByKey().map{case(k, v) => k + v}
    data.saveAsTextFile(args(1))

    sc.stop()
  }

}

this code basically includes 3 steps: sortByKey, map and saveAsTextFile. There seems to be nothing wrong with the first two steps, but when it comes to the third step, he was wrong all the time, and then repeated the second step. the third step went wrong due to "FetchFailed (BlockManagerId (40, sr232, 44815, 0), shuffleId = 0, mapId = 11825, reduceId = 0)"

+4
source share
1 answer

, : java.io.IOException: sendMessageReliably , ack 60

.. "spark.core.connection.ack.wait.timeout" , 60 . , - .

+4

All Articles