For those who work with a large dataset :
rdd.collect() case should not use rdd.collect() since it will collect all the data in the form of Array in the driver, which is the easiest way to exit the memory.
rdd.coalesce(1).saveAsTextFile() should not be used either, since the parallelism of the upstream steps will be lost for execution on the same node from where the data will be stored.
rdd.coalesce(1, shuffle = true).saveAsTextFile() is the best simple option, since it will support parallel processing of tasks in the upstream direction and then only perform mixing for one node ( rdd.repartition(1).saveAsTextFile() exact synonym).
rdd.saveAsSingleTextFile() as described below, additionally allows you to store rdd in a single file with a specific name , while preserving the parallelism properties rdd.coalesce(1, shuffle = true).saveAsTextFile() .
Something that may be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") is that it actually produces a file whose path is path/to/file.txt/part-00000 and not path/to/file.txt .
The following rdd.saveAsSingleTextFile("path/to/file.txt") solution rdd.saveAsSingleTextFile("path/to/file.txt") will actually create a file whose path is path/to/file.txt :
package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144 hdfs.delete(new Path(s"$path.tmp"), true) } } }
which can be used in this way:
import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
This snippet:
First, it saves rdd with rdd.saveAsTextFile("path/to/file.txt") in the temporary folder path/to/file.txt.tmp as if we did not want to store the data in one file (which saves processing of parallel jobs)
And only then, using the hadoop file system api , we continue to merge ( FileUtil.copyMerge() ) various output files to create our final output path/to/file.txt one file path/to/file.txt .
Xavier Guihot Feb 09 '18 at 18:37 2018-02-09 18:37
source share