This is easily achieved using the TPL Dataflow library .
First, suppose you have a BufferBlock<T> , this is your turn:
var queue = new BufferBlock<T>();
Then you need to perform the action on the block, this is represented by the ActionBlock<T> class :
var action = new ActionBlock<T>(t => { }, new ExecutionDataflowBlockOptions {
Note the constructor above, it takes an instance of ExecutionDataflowBlockOptions and sets MaxDegreeOfParallelism to use multiple parallel elements that you want to handle at the same time.
Below the surface, a parallel task library is used to handle thread allocation for tasks, etc. TPL Dataflow is for a higher level abstraction, which allows you to configure just how much parallelism / throttling / etc you want.
For example, if you do not want ActionBlock<TInput> to ActionBlock<TInput> any elements (preferring them to BufferBlock<T> in BufferBlock<T> ), you can also set the BoundedCapacity property , which limits the number of elements that ActionBlock<TInput> will hold ActionBlock<TInput> once (including number of processed items, as well as reserved items):
var action = new ActionBlock<T>(t => { }, new ExecutionDataflowBlockOptions {
In addition, if you want to create a new, fresh Task<TResult> instance to process each element, then you can set the MaxMessagesPerTask property , indicating that each Task<TResult> process one element:
var action = new ActionBlock<T>(t => { }, new ExecutionDataflowBlockOptions {
Please note that depending on the number of other tasks performed by your application, this may or may not be optimal for you, and you may also think about the cost of deploying a new task for each element that comes through ActionBlock<TInput> .
From there, just bind BufferBlock<T> to ActionBlock<TInput> with a call to the LinkTo method:
IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true; });
You set the PropogateCompletion property to true so that when you wait on ActionBlock<T> completion will be sent to ActionBlock<T> (if / if there are no more elements to process), which you can subsequently expect.
Note that you can call the Dispose method on the IDisposable interface returned from the LinkTo call if you want a link between blocks to be deleted.
Finally, you send items to the buffer using the Post method:
queue.Post(new T());
And when you are done (if you have ever done), you call the Complete method:
queue.Complete();
Then in the action block you can wait until this happens, waiting for the Task instance opened by the Completion property :
action.Completion.Wait();
Hope the elegance of this is clear:
- You do not need to control the creation of new
Task instances / threads / etc to control the work, the blocks do this for you based on the parameters that you provide (and this depends on each block). - A cleaner separation of concerns. The buffer is separated from the action, like all other blocks. You create blocks and then link them together.