How to save a DataFrame candle as csv on disk?

For example, the result of this:

df.filter("project = 'en'").select("title","count").groupBy("title").sum() 

will return an array.

How to save a spark DataFrame as a CSV file to disk?

+11
scala apache-spark apache-spark-sql
source share
4 answers

Apache Spark does not support native CSV output to disk.

You have four solutions available:

  1. You can convert your Dataframe to RDD:

     def convertToReadableString(r : Row) = ??? df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath) 

    This will create a filepath folder. Below the file path, you will find section files (e.g. part-000 *)

    What I usually do if I want to add all sections to a large CSV is

     cat filePath/part* > mycsvfile.csv 

    Some will use coalesce(1,false) to create a single partition from RDD. This is usually a bad practice , as it can overload the driver, pulling into it all the data that you collect.

    Note that df.rdd will return RDD[Row] .

  2. With Spark & ​​lt; 2 you can use the spark-csv library data library :

    • Spark 1.4+:

       df.write.format("com.databricks.spark.csv").save(filepath) 
    • Spark 1.3:

       df.save(filepath,"com.databricks.spark.csv") 
  3. Spark 2.x does not need spark-csv because it is included in Spark.

     df.write.format("csv").save(filepath) 
  4. You can convert to a local Pandas data frame and use the to_csv method (PySpark only).

Note: Solutions 1, 2, and 3 will result in CSV ( part-* ) format files generated by the underlying Hadoop API that Spark calls when save called. You will have one part- file per partition.

+23
source share

I had a similar problem when I had to save the contents of the data frame to the csv file that I defined. df.write("csv").save("<my-path>") created a directory than a file. So come to the following solutions. Most of the code is taken from the following dataframe-to-csv with minor changes to the logic.

 def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = { val tmpParquetDir = "Posts.tmp.parquet" df.repartition(1).write. format("com.databricks.spark.csv"). option("header", header.toString). option("delimiter", sep). save(tmpParquetDir) val dir = new File(tmpParquetDir) val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv" val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString (new File(tmpTsvFile)).renameTo(new File(tsvOutput)) dir.listFiles.foreach( f => f.delete ) dir.delete } 
+1
source share

Writing data to disk as csv is similar to reading from csv. If you want your result to be a single file, you can use the join.

 df.coalesce(1) .write .option("header","true") .option("sep",",") .mode("overwrite") .csv("output/path") 

If your result is an array, you should use a language dependent solution, not a spark dataframe api. Because all these results return to the driver of the car.

+1
source share

I had a similar problem. I needed to write the csv file to the driver when I connected to the cluster in client mode.

I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.

I checked the spark-csv code and found the code responsible for converting the data frame to the csv RDD[String] source code in com.databricks.spark.csv.CsvSchemaRDD .

Unfortunately, it is hardcoded using sc.textFile and the end of the corresponding method.

I copied this code and deleted the last lines using sc.textFile and returned RDD instead.

My code is:

 /* This is copypasta from com.databricks.spark.csv.CsvSchemaRDD Spark code has perfect method converting Dataframe -> raw csv RDD[String] But in last lines of that method it hardcoded against writing as text file - for our case we need RDD. */ object DataframeToRawCsvRDD { val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) (implicit ctx: ExecutionContext): RDD[String] = { val delimiter = parameters.getOrElse("delimiter", ",") val delimiterChar = if (delimiter.length == 1) { delimiter.charAt(0) } else { throw new Exception("Delimiter cannot be more than one character.") } val escape = parameters.getOrElse("escape", null) val escapeChar: Character = if (escape == null) { null } else if (escape.length == 1) { escape.charAt(0) } else { throw new Exception("Escape character cannot be more than one character.") } val quote = parameters.getOrElse("quote", "\"") val quoteChar: Character = if (quote == null) { null } else if (quote.length == 1) { quote.charAt(0) } else { throw new Exception("Quotation cannot be more than one character.") } val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") val quoteMode: QuoteMode = if (quoteModeString == null) { null } else { QuoteMode.valueOf(quoteModeString.toUpperCase) } val nullValue = parameters.getOrElse("nullValue", "null") val csvFormat = defaultCsvFormat .withDelimiter(delimiterChar) .withQuote(quoteChar) .withEscape(escapeChar) .withQuoteMode(quoteMode) .withSkipHeaderRecord(false) .withNullString(nullValue) val generateHeader = parameters.getOrElse("header", "false").toBoolean val headerRdd = if (generateHeader) { ctx.sparkContext.parallelize(Seq( csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) )) } else { ctx.sparkContext.emptyRDD[String] } val rowsRdd = dataFrame.rdd.map(row => { csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) }) headerRdd union rowsRdd } } 
0
source share

All Articles