Write RDD to HDFS in an intrinsically safe context

I have a spark streaming environment with spark version 1.2.0, where I extract data from a local folder, and every time I find a new file added to the folder, I do some kind of conversion.

val ssc = new StreamingContext(sc, Seconds(10)) val data = ssc.textFileStream(directory) 

To do my analysis of DStream data, I have to convert it to an array

 var arr = new ArrayBuffer[String](); data.foreachRDD { arr ++= _.collect() } 

Then I use the data to retrieve the information I want and to save it to HDFS.

 val myRDD = sc.parallelize(arr) myRDD.saveAsTextFile("hdfs directory....") 

Since I really need to manipulate the data using an array, it is not possible to save the data to HDFS using DStream.saveAsTextFiles("...") (this will work fine), and I need to save the RDD, but with this premise, I finally have empty output files named part- 00000, etc.

With arr.foreach(println) I can see the correct conversion results.

My suspect is that the spark tries every batch to write data in the same files, deleting what was written earlier. I tried to save a dynamic folder with a name, for example myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) , but only one fold is always created and the output files are still empty.

How to write RDD in HDFS in the context of intrinsic safety?

+7
scala hadoop hdfs apache-spark spark-streaming
source share
2 answers

You use Spark Streaming so that it is not designed. I would recommend not using Spark for your use case or adapting your code to work on Spark. Collecting an array with a driver defeats the purpose of using a distributed engine and makes your application a single machine efficiently (two machines will also impose more overhead than just processing data on one machine).

Anything you can do with an array you can do with Spark. So just run your calculations inside the stream, distribute them among the workers and write your output using DStream.saveAsTextFiles() . You can use foreachRDD + saveAsParquet(path, overwrite = true) to write to a single file.

+5
source share

@vzamboni: Spark 1.5+ dataframes api has this function:

 dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path); 
+2
source share

All Articles