Sparks, incorrect behavior when throwing a SparkException in EMR

I am running a spark job in EMR with YARN as a resource manager and on two nodes. I need to purposefully refuse a step if my condition is not met, so the next step is not executed as configured. To do this, I throw a custom exception after entering the log message in dynamoDB.

It works fine, but a record in Dynamo is inserted twice.

Below is my code.

if(<condition>) { <method call to insert in dynamo> throw new SparkException(<msg>); return; } 

If I delete the line to throw exceptions, it works fine, but the step is complete.

How can I make a step unsuccessful without receiving a log message twice.

Thanks for the help.

Regards, Sorabh

+7
amazon-dynamodb yarn apache-spark amazon-emr
source share
1 answer

Probably the reason your dynamo message was inserted twice was due to the fact that your error condition was processed and processed by two different executors. The spark divides the work to be done among the workers, and these workers have no knowledge.

I'm not sure what drives your requirement for the Spark FAIL step, but I would suggest tracking this failure case in your application code instead, instead of trying to warp directly. In other words, write a code that detects an error and passes it back to your spark driver, and then act on it accordingly.

One way to do this is to use a battery to count errors that occur while processing your data. It will look something like this (I assume scala and DataFrames, but you can adapt to RDD and / or python if necessary):

 val accum = sc.longAccumulator("Error Counter") def doProcessing(a: String, b: String): String = { if(condition) { accum.add(1) null } else { doComputation(a, b) } } val doProcessingUdf = udf(doProcessing _) df = df.withColumn("result", doProcessing($"a", $"b")) df.write.format(..).save(..) // Accumulator value not computed until an action occurs! if(accum.value > 0) { // An error detected during computation! Do whatever needs to be done. <insert dynamo message here> } 

One good idea in this approach is that if you are looking for feedback in the Spark user interface, you can see the battery values โ€‹โ€‹there while it is running. For reference, here is the battery documentation: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

+2
source share

All Articles