I had a similar problem. I needed to write the csv file to the driver when I connected to the cluster in client mode.
I wanted to reuse the same CSV parsing code as Apache Spark to avoid potential errors.
I checked the spark-csv code and found the code responsible for converting the data frame to the csv RDD[String] source code in com.databricks.spark.csv.CsvSchemaRDD .
Unfortunately, it is hardcoded using sc.textFile and the end of the corresponding method.
I copied this code and deleted the last lines using sc.textFile and returned RDD instead.
My code is:
object DataframeToRawCsvRDD { val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) (implicit ctx: ExecutionContext): RDD[String] = { val delimiter = parameters.getOrElse("delimiter", ",") val delimiterChar = if (delimiter.length == 1) { delimiter.charAt(0) } else { throw new Exception("Delimiter cannot be more than one character.") } val escape = parameters.getOrElse("escape", null) val escapeChar: Character = if (escape == null) { null } else if (escape.length == 1) { escape.charAt(0) } else { throw new Exception("Escape character cannot be more than one character.") } val quote = parameters.getOrElse("quote", "\"") val quoteChar: Character = if (quote == null) { null } else if (quote.length == 1) { quote.charAt(0) } else { throw new Exception("Quotation cannot be more than one character.") } val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") val quoteMode: QuoteMode = if (quoteModeString == null) { null } else { QuoteMode.valueOf(quoteModeString.toUpperCase) } val nullValue = parameters.getOrElse("nullValue", "null") val csvFormat = defaultCsvFormat .withDelimiter(delimiterChar) .withQuote(quoteChar) .withEscape(escapeChar) .withQuoteMode(quoteMode) .withSkipHeaderRecord(false) .withNullString(nullValue) val generateHeader = parameters.getOrElse("header", "false").toBoolean val headerRdd = if (generateHeader) { ctx.sparkContext.parallelize(Seq( csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) )) } else { ctx.sparkContext.emptyRDD[String] } val rowsRdd = dataFrame.rdd.map(row => { csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) }) headerRdd union rowsRdd } }
Ajk
source share