Spark Streaming rdd process and save to a single HDFS file

  • I use Kafka Spark Streaming to receive streaming data.

    val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
    
  • I use this DStream and process RDD

    val output = lines.foreachRDD(rdd => 
            rdd.foreachPartition { partition => 
                partition.foreach { file => runConfigParser(file)}
    })
    
  • runConfigParseris a JAVA method that parses a file and produces the output that I need to save in HDFS. Thus, multiple nodes will process the RDD and write the output to a single HDFS file. Because I want to upload this information to HIVE.

should I output the result runConfigParserand use sc.parallze(output).saveAsTextFile(path)so that all of my nodes write the RDD outputs to a single HDFS file.? Is this project effective?

I will upload this single HDFS file (which will be constantly updated as its streaming data) to HIVE and query using Impala.

+1
2

. HDFS, saveAsTextFile, HDFS RDD, .

HDFS, reduce/collect HDFS Java API HDFS. , Spark () Spark.

+1

"" saveAsTextFile. :

import org.apache.hadoop.fs._

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
    val sourceFile = hdfsServer + "/tmp/" 
    rdd.saveAsTextFile(sourceFile)
    val dstPath = hdfsServer + "/final/" 
    merge(sourceFile, dstPath, fileName)
  }

  def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath)) {
      hdfs.mkdirs(destinationPath)
    }
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
  }
+1

All Articles