Spark DStream periodically calls saveAsObjectFile, using transform does not work properly

I read the data from Kafka using the DirectKafkaStream API 1 , do some conversions by updating the account, and then return the data to Kafka. Actually this world of code is in a test:

 kafkaStream[Key, Value]("test") .map(record => (record.key(), 1)) .updateStateByKey[Int]( (numbers: Seq[Int], state: Option[Int]) => state match { case Some(s) => Some(s + numbers.length) case _ => Some(numbers.length) } ) .checkpoint(this)("count") { case (save: (Key, Int), current: (Key, Int)) => (save._1, save._2 + current._2) } .map(_._2) .reduce(_ + _) .map(count => (new Key, new Result[Long](count.toLong))) .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName) 

The checkpoint statement is an enrichment of the DStream API that I created, which should practically save one RDD of a given DStream one Time in HDFS using saveAsObjectFile . In practice, this saves the result of every 60th micropacket (RDD) in HDFS.

The checkpoint does the following:

 def checkpoint(processor: Streaming)(name: String)( mergeStates: (T, T) => T): DStream[T] = { val path = processor.configuration.get[String]( "processing.spark.streaming.checkpoint-directory-prefix") + "/" + Reflection.canonical(processor.getClass) + "/" + name + "/" logInfo(s"Checkpoint base path is [$path].") processor.registerOperator(name) if (processor.fromCheckpoint && processor.restorationPoint.isDefined) { val restorePath = path + processor.restorationPoint.get.ID.stringify logInfo(s"Restoring from path [$restorePath].") checkpointData = context.objectFile[T](restorePath).cache() stream .transform((rdd: RDD[T], time: Time) => { val merged = rdd .union(checkpointData) .map[(Boolean, T)](record => (true, record)) .reduceByKey(mergeStates) .map[T](_._2) processor.maybeCheckpoint(name, merged, time) merged } ) } else { stream .transform((rdd: RDD[T], time: Time) => { processor.maybeCheckpoint(name, rdd, time) rdd }) } } 

The effective part of the code is as follows:

 dstream.transform((rdd: RDD[T], time: Time) => { processor.maybeCheckpoint(name, rdd, time) rdd }) 

Where the DStream variable in the above code is the result of the previous statement, which is updateStateByKey , so the conversion is called immediately after updateStateByKey .

 def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = { if (doCheckpoint(time)) { logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].") val newPath = configuration.get[String]( "processing.spark.streaming.checkpoint-directory-prefix") + "/" + Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode logInfo(s"Saving new checkpoint to [$newPath].") rdd.saveAsObjectFile(newPath) registerCheckpoint(name, Operator(name), time) logInfo(s"Checkpoint completed for operator [$name].") } } 

As you can see, most of the code is just bookkeeping, but saveAsObjectFile is called efficiently.

The problem is that even if the resulting RDDs from updateStateByKey should be saved automatically when saveAsObjectFile is called on every Xth micropacket, Spark will recompile everything from scratch from the beginning of the stream job, starting from reading everything from Kafka. I tried to set cache or persist at different storage levels, on DStreams, as well as on RDD.

Micro batches:

Micropackages

DAG for job 22:

DAG for job 22

DAG for the job that saveAsObjectFile is saveAsObjectFile :

SAOF1 SAOF2

What could be the problem?

Thanks!

1 Using Spark 2.1.0.

+7
hdfs streaming apache-spark apache-kafka
source share
1 answer

I believe that using transform for a periodic checkpoint will lead to unexpected cache behavior.

Instead of using foreachRDD to perform periodic audit checks, the DAG will remain stable enough to cache RDD efficiently.

I am pretty sure that this was a solution to a similar problem that we had some time ago.

+2
source share

All Articles