Global error handling for each block in the data stream

I am developing a long pipeline of data flow consisting of several blocks. Elements are fed into the input block of the conveyor, finally made their way through it and displayed in the user interface at the end (as a courtesy for the user, the real task of the conveyor is to save the processing results to disk).

Lambda functions inside pipeline blocks can throw exceptions for various reasons (poor input, network failure, error during calculation, whatever). In this case, instead of abandoning the entire pipeline, I would like to release an offensive element and display it in the user interface in the "Errors" section.

What is the best way to do this? I understand that I can wrap every lambda function in try / catch:

var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...) var workerBlock = new TransformBlock<WorkItem, WorkItem>(item => { try { return DoStuff(item); } catch (Exception ex) { errorLoggingBlock.SendAsync(Tuple.Create(item, ex)); return null; } } 

But I have about 10 blocks in the pipeline, and copying / pasting this code into each of them seems silly. In addition, I do not like the idea of ​​returning null, since now all subordinate blocks will have to check it.

My next best idea is to create a function that returns a lambda that does the wrapper for me:

  private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem { return arg => { try { return f(arg); } catch (Exception ex) { errorLoggingBlock.SendAsync(Tuple.Create(item, ex)); return default(TResult); } }; } 

But that seems too meta. Is there a better way?

+8
c # error-handling tpl-dataflow
source share
1 answer

This is a very interesting topic.

You can define filters when linking blocks, which means you can redirect error results to error handling blocks. To do this, blocks must return meta objects that contain both their processing results and a failure / success indicator.

This idea is better described in Rail-Oriented Programming , where each function in the chain processes successful results or redirects the results of the crash track for possible logging.

In practical terms, this means that after each block you must add two links: one with a filter condition that translates into an error processing block, and one default link that goes to the next stage of the stream.

You can even combine two ideas to handle partial failures. A partial failure result will contain both a failure indicator and a payload. You can redirect the result to the logging block before passing it to the next step.

I found it much easier to explicitly state the status of each message, rather than trying to determine its status by checking for null, missing values, etc. This means that blocks must wrap their results in envelope objects that contain status flags, results, and / or any errors.

+4
source share

All Articles