Using TPL Dataflow, can I cancel all messages and then add one?

With the TPL data flow library, I would like to do something like this:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true); 

It looks like the undo token in ActionBlock undoes all this; I will need to create a new ActionBlock if I install it. Is partial cancellation possible with ActionBlock?

Messages that have not yet been processed should not be attempted. It would be nice if there was some kind of cancellation token that can be checked in the current executable message.

+7
c # task-parallel-library tpl-dataflow cancellationtokensource
source share
3 answers

Take a look at the BroadcastBlock<T> , which only stores the last item sent to it. You can put the broadcast block before the ActionBlock<T> .

During the publication of a new element in the broadcast block, the element currently being processed by the action block is not canceled; it will overwrite any existing element already saved by the broadcast block; actually discarding any old messages not yet processed by the action block. When the action block completes its current element, it will take the last element sent to the broadcast block.

+4
source share

In addition to Monroe Thomas's answer, it is important to understand that the ActionBlock following BroadcastBlock needs Limited bandwidth, limited to 1 , or it will store and process each broadcast block message, even if it is still running.
Sample code is given here:

 ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber => { await Task.Delay(100); Console.WriteLine($">{ThisNumber}"); }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null); ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true }); for(int IX = 0; IX < 128; IX++) { await ThrottleBlock.SendAsync(IX); await Task.Delay(10); } 

The result is the following:

 >0 >6 >12 >20 >27 >34 >41 >48 >55 >62 >68 >75 >82 >88 >95 >101 >108 >115 >122 >127 

Enjoy it!
-Simon

+1
source share

There is nothing like this in TPL Dataflow, but I see several ways how you could implement it yourself:

  • If you do not need to treat the modified block as a normal data stream block (for example, there is no support for LinkTo() ), then an easy way is to write a type that wraps the ActionBlock , but whose elements also contain a flag indicating whether they should be processed. When you specify cancelAllPreviousPosts: true , all these flags are reset, so these elements will be skipped.

    The code might look something like this:

     class CancellableActionBlock<T> { private class Item { public T Data { get; private set; } public bool ShouldProcess { get; set; } public Item(T data) { Data = data; ShouldProcess = true; } } private readonly ActionBlock<Item> actionBlock; private readonly ConcurrentDictionary<Item, bool> itemSet; public CancellableActionBlock(Action<T> action) { itemSet = new ConcurrentDictionary<Item, bool>(); actionBlock = new ActionBlock<Item>(item => { bool ignored; itemSet.TryRemove(item, out ignored); if (item.ShouldProcess) { action(item.Data); } }); } public bool Post(T data, bool cancelAllPreviousPosts = false) { if (cancelAllPreviousPosts) { foreach (var item in itemSet.Keys) { item.ShouldProcess = false; } itemSet.Clear(); } var newItem = new Item(data); itemSet.TryAdd(newItem, true); return actionBlock.Post(newItem); } // probably other members that wrap actionBlock members, // like Complete() and Completion } 
  • If you want to create something more complex and reusable, you can create a special block only for this cancellation. You can implement this with thee BufferBlock linked together, where the third will have capacity 1 and the second will have unlimited capacity. Thus, almost all queued items will be in the second block, so you can only cancel by replacing this block with a new one. The whole structure will be represented by Encapsulate() with the first and third blocks.

    The problems with this approach are that the cancellation has a delay of 1 element (the one in the third block). In addition, I did not find a good interface for this.

0
source share

All Articles