Probably the reason your dynamo message was inserted twice was due to the fact that your error condition was processed and processed by two different executors. The spark divides the work to be done among the workers, and these workers have no knowledge.
I'm not sure what drives your requirement for the Spark FAIL step, but I would suggest tracking this failure case in your application code instead, instead of trying to warp directly. In other words, write a code that detects an error and passes it back to your spark driver, and then act on it accordingly.
One way to do this is to use a battery to count errors that occur while processing your data. It will look something like this (I assume scala and DataFrames, but you can adapt to RDD and / or python if necessary):
val accum = sc.longAccumulator("Error Counter") def doProcessing(a: String, b: String): String = { if(condition) { accum.add(1) null } else { doComputation(a, b) } } val doProcessingUdf = udf(doProcessing _) df = df.withColumn("result", doProcessing($"a", $"b")) df.write.format(..).save(..)
One good idea in this approach is that if you are looking for feedback in the Spark user interface, you can see the battery values โโthere while it is running. For reference, here is the battery documentation: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
Ryanw
source share