I am trying to process events from the stream-avro stream through a spark stream, and I do it like this: Design patterns for using foreachRDD , but for what reason the code does not execute, where it says "DOES NOT WORK". Partion.size () returns 1, but it doesn't seem to even iterate over this 1 section. ps I'm scala noob.
events.foreachRDD { rdd =>
if (rdd.take(1).size == 1) {
System.out.println("**********************************WE GOT AN RDD")
System.out.println("*******************************NUM PARTITIONS =" + rdd.partitions.size)
val array = rdd.collect()
array.foreach { x =>
System.out.println("**************WORKS********************" + new String(x.event.getBody().array(),"UTF-8"))
}
rdd.foreachPartition { partitionItr =>
partitionItr.foreach { item =>
System.out.println("****************DOES NOT WORK******************" + new String(item.event.getBody().array(),"UTF-8"))
}
}
}else{
System.out.println("**********************************WE GOT NOTHIN")
}
}
source
share