For those who need Java code instead of Scala:
JavaPairInputDStream<LongWritable, Text> textFileStream = jsc.fileStream( inputPath, LongWritable.class, Text.class, TextInputFormat.class, FileInputDStream::defaultFilter, false ); JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> { UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd(); List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava(); List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> { if (depRdd.isEmpty()) { return null; } JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD(); String filename = depRdd.name(); JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename); return newDep.rdd(); }).filter(t -> t != null).collect(Collectors.toList()); Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq(); ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD(); });
racc
source share