How to rewrite the output directory into a spark

I have a sparking application that creates every database. I need to save / overwrite the results of the processed data.

When I tried to overwrite the dataset, org.apache.hadoop.mapred.FileAlreadyExistsException stops execution.

I set the Spark set("spark.files.overwrite","true") property set("spark.files.overwrite","true") , but you're out of luck.

How to overwrite or pre-configure files from a spark?

+89
apache-spark
Nov 20 '14 at 7:14
source share
8 answers

UPDATE: Suggest using Dataframes , plus something like ....write.mode(SaveMode.Overwrite)...

For older versions try

 yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf) 

In version 1.1.0, you can set configuration parameters using the spark-submit script with the --conf flag.

WARNING (older versions): According to @piggybox, Spark has an error due to which it will overwrite only those files that it needs to write its part- files, the rest will be part- .

+92
Nov 28 '14 at 17:56
source share

since df.save(path, source, mode) deprecated, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )

use df.write.format(source).mode("overwrite").save(path)
where df.write is a DataFrameWriter

'source' can be ("com.databricks.spark.avro" | "parquet" | "json")

+27
Jul 13 '16 at 9:02
source share

The documentation for the spark.files.overwrite parameter says the following: "Whether to write files added through SparkContext.addFile() when the target file exists and its contents do not match the contents of the source." Thus, this does not affect the saveAsTextFiles method.

You can do this before saving the file:

 val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } } 

Aas is explained here: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

+26
Nov 24 '14 at 9:18
source share

In the pyspark.sql.DataFrame.save documentation (currently in 1.3.1) you can specify mode='overwrite' while saving the DataFrame:

 myDataFrame.save(path='myPath', source='parquet', mode='overwrite') 

I checked that this will even delete files with partition files. So, if you initially said 10 partitions / files, but then overwritten the folder with the DataFrame, which had only 6 partitions, the resulting folder will have 6 partitions / files.

See the Spark SQL documentation for more information on mode options.

+22
Apr 29 '15 at 21:25
source share

df.write.mode('overwrite').parquet("/output/folder/path") works if you want to overwrite the parquet file using python. This is in spark 1.6.2. API may vary in later versions.

+6
Jan 30 '17 at 19:56
source share
  val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf) 
+4
Jun 01 '17 at 15:14
source share

This overloaded version of the Save function works for me:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

The above example will replace the existing folder. The submode can also accept these parameters ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Add . Adding mode means that when storing a DataFrame in the data source, if the data / table already exists, it is expected that the contents of the DataFrame will be added to the existing data.

ErrorIfExists : ErrorIfExists mode means that when saving a DataFrame to the data source, if the data already exists, it is expected that an exception will be thrown.

Ignore . Ignore mode means that when saving a DataFrame to the data source, if the data already exists, it is expected that the save operation will not save the contents of the DataFrame and will not modify the existing data.

+2
Apr 6 '16 at 17:36
source share

If you want to use your own output format, you can also get the desired behavior using RDD.

Look at the following classes: FileOutputFormat , FileOutputCommitter

In the file output format, you have a method called checkOutputSpecs that checks if the output directory exists. In FileOutputCommitter, you have a commitJob, which usually transfers data from a temporary directory to its final location.

I have not been able to check it yet (I will do it as soon as I have a few free minutes), but theoretically: if I extend FileOutputFormat and override checkOutputSpecs to a method that does not throw an exception from the directory already exists, and edit the commitJob method of my custom output committer in order to execute the logic that I want (for example, to redefine some files, add others), than I can also achieve the desired behavior using RDD.

The output format is passed: saveAsNewAPIHadoopFile (which is the saveAsTextFile method, and also actually saves files). And the output committer is configured at the application level.

0
Apr 6 '16 at 18:13
source share



All Articles