How to make saveAsTextFile NOT split output to multiple files?

When using Scala in Spark whenever I upload results using saveAsTextFile , it seems to split the output into several parts. I just pass it a parameter (path).

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 
  • Does the number of outputs match the number of gearboxes it uses?
  • Does this mean that the result is compressed?
  • I know that I can combine the output together with bash, but is it possible to save the output in a single text file without splitting? I looked at the API docs, but it says little about it.
+72
scala apache-spark
Jun 23 '14 at 16:52
source share
9 answers

The reason he saves this is because several files are because the calculation is distributed. If the output is small enough for you to think that you can put it on one machine, you can end your program with

 val arr = year.collect() 

And then save the resulting array as a file. Another way would be to use a custom delimiter, partitionBy and make it all go to the same partition, although this is impractical because you will not get any parallelization.

If you need to save the file using saveAsTextFile , you can use coalesce(1,true).saveAsTextFile() . This basically means that the calculation is then combined into 1 section. You can also use repartition(1) , which is just a wrapper for coalesce with the shuffle argument set to true. Looking through the source of RDD.scala , as I understand what came of it, you should take a look.

+95
Jun 23 '14 at 17:58
source share

You can call coalesce(1) and then saveAsTextFile() - but that might be a bad idea if you have a lot of data. Separate files for splitting are generated in the same way as in Hadoop, so that individual prints and reducers are written to different files. Having one output file is a good idea if you have very little data, in which case you could collect () just as @aaronman said.

+21
Jun 23 '14 at 19:38
source share

For those who work with a large dataset :

  • rdd.collect() case should not use rdd.collect() since it will collect all the data in the form of Array in the driver, which is the easiest way to exit the memory.

  • rdd.coalesce(1).saveAsTextFile() should not be used either, since the parallelism of the upstream steps will be lost for execution on the same node from where the data will be stored.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() is the best simple option, since it will support parallel processing of tasks in the upstream direction and then only perform mixing for one node ( rdd.repartition(1).saveAsTextFile() exact synonym).

  • rdd.saveAsSingleTextFile() as described below, additionally allows you to store rdd in a single file with a specific name , while preserving the parallelism properties rdd.coalesce(1, shuffle = true).saveAsTextFile() .




Something that may be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") is that it actually produces a file whose path is path/to/file.txt/part-00000 and not path/to/file.txt .

The following rdd.saveAsSingleTextFile("path/to/file.txt") solution rdd.saveAsSingleTextFile("path/to/file.txt") will actually create a file whose path is path/to/file.txt :

 package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144 hdfs.delete(new Path(s"$path.tmp"), true) } } } 

which can be used in this way:

 import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt") 
 // Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec]) 

This snippet:

  • First, it saves rdd with rdd.saveAsTextFile("path/to/file.txt") in the temporary folder path/to/file.txt.tmp as if we did not want to store the data in one file (which saves processing of parallel jobs)

  • And only then, using the hadoop file system api , we continue to merge ( FileUtil.copyMerge() ) various output files to create our final output path/to/file.txt one file path/to/file.txt .

+18
Feb 09 '18 at 18:37
source share

As already mentioned, you can collect or combine a dataset to make Spark create a single file. But it also limits the number of Spark tasks that can run on your dataset in parallel. I prefer it to create one hundred files in the HDFS output directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txt to extract the results to a single file on the local file system. This makes the most sense when your conclusion is a relatively small report, of course.

+4
Mar 29 '17 at 23:53 on
source share

You can call repartition() and follow this path:

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) var repartitioned = year.repartition(1) repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

enter image description here

+2
Nov 27 '17 at 23:25
source share

You can do this in the next version of Spark, in the current version 1.0.0 this is not possible if you do not do it manually anyway, for example, as you mentioned, with a bash script call,

+1
Jun 24 '14 at 5:18
source share

I also want to mention that the documentation clearly states that users should be careful when calling coalesce with a real small number of sections. this can cause upstream partitions to inherit this number of partitions.

I would not recommend using coalesce (1) unless it is really required.

+1
Jan 20 '16 at 23:16
source share

In Spark 1.6.1, the format is shown below. It creates one output file. It is best to use it if the output is small enough to process. Basically, what he does is that he returns a new RDD, which boils down to numPartitions sections. If you do a radical union, for example, to numPartitions = 1, this can lead to the fact that your calculations will be performed on fewer nodes than you like (for example, one node in the case of numPartitions = 1)

 pair_result.coalesce(1).saveAsTextFile("/app/data/") 
+1
Jun 01 '16 at 18:30
source share

Here is my answer for single file output. I just added coalesce(1)

 val year = sc.textFile("apat63_99.txt") .map(_.split(",")(1)) .flatMap(_.split(",")) .map((_,1)) .reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 

the code:

 year.coalesce(1).saveAsTextFile("year") 
0
Sep 26 '17 at 2:39 on
source share



All Articles