Not using exception handling in Spark

I am working on a Java-based Spark Streaming application that responds to messages that go through the Kafka theme. For each message, the application does some processing and writes the results to another Kafka topic.

Sometimes, due to unforeseen data problems, code that runs on RDD may fail and throw an exception. When this happens, I would like to have a common handler that could take the necessary actions and drop the message in the subject of the error. Right now, these exceptions are being written to Spark by Spark himself.

What is the best approach for this, instead of writing try-catch blocks for every block of code that runs on RDD?

+7
apache-spark spark-streaming
source share
2 answers

You can write a generic function that does this. You only need to wrap it around the RDD actions, as those that can throw Spark exceptions (transformers like .map and .filter lazy executed by actions).

(Assume this is in Scala). Perhaps you can even try something with implicits. Create a class containing the RDD and it will handle the error. Here is a sketch of what might look like:

 implicit class FailSafeRDD[T](rdd: RDD[T]) { def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try { fn(rdd) } } 

You can add an error message to the failsafeAction theme or whatever you want to do every time it crashes. And then usage might look like this:

 val rdd = ??? // Some rdd you already have val resultOrException = rdd.failsafeAction { r => r.count() } 

Other than that, I think the β€œbest” approach is somewhat subjective to application needs.

+3
source share

I think you can also implement this with try catch =>

 dstream.foreachRDD { case rdd: RDD[String] => rdd.foreach { case string: String => try { val kafkaProducer = ... val msg = ... kafkaProducer.send(msg) } catch { case d: DataException=> val kafkaErrorProducer = ... val errorMsg = ... kafkaErrorProducer.send(errorMsg ) case t: Throwable => //further error handling } } } 
+2
source share

All Articles