TPL data stream exception in a limited capacity conversion unit

I need to build a TPL data stream pipeline that will handle many messages. Since there are many messages, I cannot just Post them into an infinite BufferBlock , otherwise I will run into memory problems. Therefore, I want to use the parameter BoundedCapacity = 1 to turn off the queue and use MaxDegreeOfParallelism to use parallel processing of the task, since my TransformBlock may take some time for each message. I also use PropagateCompletion to do all the completion and not be pipelined.

But I ran into an error handling problem when an error occurred right after the first message: the await SendAsync just switches my application to wait forever.

I simplified my example for a console application example:

 var data_buffer = new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 }); var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2, BoundedCapacity = 1 }); data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true }); for (var k = 1; k <= 5; k++) { await data_buffer.SendAsync (k); Console.WriteLine ("Send: {0}", k); } data_buffer.Complete (); await process_block.Completion; 
+6
source share
1 answer

This is the expected behavior. If there is a “downstream” error, the error does not propagate “backward” up the grid. The cell expects you to detect this error (for example, through process_block.Completion ) and resolve it.

If you want to propagate errors back, you can have await or a continuation on process_block.Completion that faults the upstream block (s), if the fault is downstream block (s).

Please note that this is not the only possible solution; you may want to rebuild this part of the grid or relate the sources for an alternative purpose. The source block did not work, so they can simply continue processing with the restored grid.

+5
source

All Articles