How can I make (Spark1.6) saveAsTextFile to add an existing file?

In SparkSQL, I use DF.wirte.mode (SaveMode.Append) .json (xxxx), but this method gets these files as enter image description here

the file name is too complex and random, I cannot use api for get.So I want to use saveAstextfile, beacuse filename is not complex and regular, but I do not know how to add a file in the same directory? for your time.

+4
source share
3 answers

worked on Spark 1.5, I think this is the right use.

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).**partitionBy**("parameter1", "parameter2").save(path);
+2
source

, -. Spark Streaming rdd HDFS

    import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
  val sourceFile = hdfsServer + "/tmp/"
  rdd.saveAsTextFile(sourceFile)
  val dstPath = hdfsServer + "/final/"
  merge(sourceFile, dstPath, fileName)
}

def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  val destinationPath = new Path(dstPath)
  if (!hdfs.exists(destinationPath)) {
    hdfs.mkdirs(destinationPath)
  }
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}
+2

HDFS, , . FileUtil . , 1. , .

import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}   

val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get(hadoopConf)
val mergedPath = "merged-" + filePath + ".json"
val merged = new Path(mergedPath)
if (hdfs.exists(merged)) {
  hdfs.delete(merged, true)
}
df.wirte.mode(SaveMode.Append).json(filePath)

FileUtil.copyMerge(hdfs, path, hdfs, merged, false, hadoopConf, null)

, mergedPath location. , .

0

All Articles