I need to build a TPL data stream pipeline that will handle many messages. Since there are many messages, I cannot just Post them into an infinite BufferBlock , otherwise I will run into memory problems. Therefore, I want to use the parameter BoundedCapacity = 1 to turn off the queue and use MaxDegreeOfParallelism to use parallel processing of the task, since my TransformBlock may take some time for each message. I also use PropagateCompletion to do all the completion and not be pipelined.
But I ran into an error handling problem when an error occurred right after the first message: the await SendAsync just switches my application to wait forever.
I simplified my example for a console application example:
var data_buffer = new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 }); var process_block = new ActionBlock<int> (x => { throw new InvalidOperationException (); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2, BoundedCapacity = 1 }); data_buffer.LinkTo (process_block, new DataflowLinkOptions { PropagateCompletion = true }); for (var k = 1; k <= 5; k++) { await data_buffer.SendAsync (k); Console.WriteLine ("Send: {0}", k); } data_buffer.Complete (); await process_block.Completion;
source share