How to prevent hadoop from executing on a damaged input file

I run adoop in many input files. But if one of the files is damaged, all work fails.

How can I make a job ignore a damaged file? maybe write me some kind of counter / error log, but don’t miss all the work

+8
mapreduce hadoop cascading
source share
3 answers

It depends on where your work fails - if the line is damaged and an exception is thrown somewhere in your map method, you should just be able to wrap the body of your map method with try / catch and just report the error:

protected void map(LongWritable key, Text value, Context context) { try { // parse value to a long int val = Integer.parseInt(value.toString()); // do something with key and val.. } catch (NumberFormatException nfe) { // log error and continue } } 

But if the error is caused by your InputFormat RecordReader, you need to change the mappers run(..) method, which by default performs the following actions:

 public void run(Context context) { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } 

So you can change this to try to catch an exception in the call to context.nextKeyValue() , but you have to be careful just ignoring any errors caused by the reader. For example, IOExeption cannot be “overlooked” by simply ignoring the error.

If you wrote your own InputFormat / RecordReader, and you have a specific exception that indicates the recording failed, but allows you to skip and continue parsing, maybe something like this:

 public void run(Context context) { setup(context); while (true) { try { if (!context.nextKeyValue()) { break; } else { map(context.getCurrentKey(), context.getCurrentValue(), context); } } catch (SkippableRecordException sre) { // log error } } cleanup(context); } 

But just to repeat it again - your RecordReader should be able to recover the error, otherwise the code above could send you into an endless loop.

In your specific case - if you just want to ignore the file on the first crash, you can update the execution method to a simpler one:

 public void run(Context context) { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } catch (Exception e) { // log error } } 

Some final words of warning:

  • You need to make sure that this is not your mapper code that throws an exception, otherwise you will ignore the files for the wrong reason.
  • GZip compressed files that are not GZip compressed will not actually work when initializing a reader with a writer - therefore, this type or error will not be caught above (you will need to write your own writer for reading records). This is true for any file error that occurs when creating a record.
+6
source share

This is what cascading failure traps are used:

Whenever an operation fails and throws an exception if there is an associated trap, the offending tuple is stored on the resource indicated by the Tap trap. This allows you to continue working without losing data.

This will allow you to continue working and allow you to check for damaged files later.

If you are somewhat familiar with the cascade in the flow definition statement:

  new FlowDef().addTrap( String branchName, Tap trap ); 

Bounce traps

+2
source share

There is another possible way. You can use the mapred.max.map.failures.percent configuration mapred.max.map.failures.percent . Of course, this way of solving this problem can also hide some other problems that occur during the map phase.

0
source share

All Articles