I have a spark streaming environment with spark version 1.2.0, where I extract data from a local folder, and every time I find a new file added to the folder, I do some kind of conversion.
val ssc = new StreamingContext(sc, Seconds(10)) val data = ssc.textFileStream(directory)
To do my analysis of DStream data, I have to convert it to an array
var arr = new ArrayBuffer[String](); data.foreachRDD { arr ++= _.collect() }
Then I use the data to retrieve the information I want and to save it to HDFS.
val myRDD = sc.parallelize(arr) myRDD.saveAsTextFile("hdfs directory....")
Since I really need to manipulate the data using an array, it is not possible to save the data to HDFS using DStream.saveAsTextFiles("...") (this will work fine), and I need to save the RDD, but with this premise, I finally have empty output files named part- 00000, etc.
With arr.foreach(println) I can see the correct conversion results.
My suspect is that the spark tries every batch to write data in the same files, deleting what was written earlier. I tried to save a dynamic folder with a name, for example myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) , but only one fold is always created and the output files are still empty.
How to write RDD in HDFS in the context of intrinsic safety?
scala hadoop hdfs apache-spark spark-streaming
drstein
source share