BroadcastBlock with guaranteed delivery in the TPL data stream

I have a data stream that I process in several ways ... so I would like to send a copy of every message I receive for several purposes so that these goals can be executed in parallel ... however I need to set BoundedCapacity to my blocks, because data is transferred faster than my goals can process it, and there is a ton of data. Without BoundedCapacity, I would quickly run out of memory.

However, the problem is that BroadcastBlock will discard messages if the target cannot process it (due to BoundedCapacity).

I need a BroadcastBlock package that will not send messages, but will essentially refuse additional input until it can deliver messages for each purpose, and then ready for more.

Is there something like this, or did someone write a custom block that behaves this way?

+6
source share
1 answer

It's pretty simple to create what you ask using ActionBlock and SendAsync() , something like:

 public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>( IEnumerable<ITargetBlock<T>> targets) { var targetsList = targets.ToList(); return new ActionBlock<T>( async item => { foreach (var target in targetsList) { await target.SendAsync(item); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); } 

This is the most basic version, but expanding it to support a modified list of goals, distributing the completion or cloning function should be easy.

+6
source

All Articles