Scala foreachpartition stream

I am trying to process events from the stream-avro stream through a spark stream, and I do it like this: Design patterns for using foreachRDD , but for what reason the code does not execute, where it says "DOES NOT WORK". Partion.size () returns 1, but it doesn't seem to even iterate over this 1 section. ps I'm scala noob.

  events.foreachRDD { rdd =>
if (rdd.take(1).size == 1) {
  System.out.println("**********************************WE GOT AN RDD")
  System.out.println("*******************************NUM PARTITIONS =" + rdd.partitions.size)
  val array = rdd.collect()
  array.foreach { x => 
    System.out.println("**************WORKS********************" + new String(x.event.getBody().array(),"UTF-8"))
  }
  rdd.foreachPartition { partitionItr =>
    //System.out.println("**********************************WE NEVER GET HERE " + partitionItr.size)
    //create db connection from pool
    //val connection = ConnectionPool.getConnection()
    partitionItr.foreach { item =>
      //write to db
      System.out.println("****************DOES NOT WORK******************" + new String(item.event.getBody().array(),"UTF-8"))
      //return connection to pool
      //ConnectionPool.returnConnection(connection)
    }
  }
  //rdd.count()
}else{
  System.out.println("**********************************WE GOT NOTHIN")
}

}

+4
source share

All Articles