Spark streaming DStream RDD to get the file name

Spark stream textFileStream and fileStream can control the directory and process new files in the DDC RDD.

How to get the names of files that are processed by DSTream RDD at a certain interval?

+7
scala apache-spark
source share
2 answers

fileStream produces UnionRDD of NewHadoopRDD s. The good part of NewHadoopRDD created by sc.newAPIHadoopFile is that their name set in their path.

Here is an example of what you can do with this knowledge:

 def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] = ssc.fileStream[LongWritable, Text, TextInputFormat](directory) .transform( rdd => new UnionRDD(rdd.context, rdd.dependencies.map( dep => dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name) ) ) ) def transformByFile[U: ClassTag](unionrdd: RDD[String], transformFunc: String => RDD[String] => RDD[U]): RDD[U] = { new UnionRDD(unionrdd.context, unionrdd.dependencies.map{ dep => if (dep.rdd.isEmpty) None else { val filename = dep.rdd.name Some( transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]]) .setName(filename) ) } }.flatten ) } def main(args: Array[String]) = { val conf = new SparkConf() .setAppName("Process by file") .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(30)) val dstream = namesTextFileStream(ssc, "/some/directory") def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] = rdd.map(line => (filename, line)) val transformed = dstream. transform(rdd => transformByFile(rdd, byFileTransformer)) // Do some stuff with transformed ssc.start() ssc.awaitTermination() } 
+4
source share

For those who need Java code instead of Scala:

 JavaPairInputDStream<LongWritable, Text> textFileStream = jsc.fileStream( inputPath, LongWritable.class, Text.class, TextInputFormat.class, FileInputDStream::defaultFilter, false ); JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> { UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd(); List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava(); List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> { if (depRdd.isEmpty()) { return null; } JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD(); String filename = depRdd.name(); JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename); return newDep.rdd(); }).filter(t -> t != null).collect(Collectors.toList()); Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq(); ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD(); }); 
0
source share

All Articles