I have a producer / consumer pattern in my application that implemented the used TPL data stream. I have a large data flow grid with approximately 40 blocks. There are two main functional parts in the grid: the producer part and the consumer part. The producer is supposed to constantly provide great work for the consumer, while the consumer processes the incoming work slowly. I want to suspend the work of the manufacturer when the consumer is busy with a certain number of work items. Otherwise, the application consumes a lot of memory / processor and behaves unstable.
I made a demo application that demonstrates the problem:

using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false }; var boundedOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false, BoundedCapacity = 5 }; var bufferBlock = new BufferBlock<int>(boundedOptions); var producerBlock = new TransformBlock<int, int>(x => x + 1, options); var broadcastBlock = new BroadcastBlock<int>(x => x, options); var consumerBlock = new ActionBlock<int>(async x => { var delay = 1000; if (x > 10) delay = 5000; await Task.Delay(delay); Console.WriteLine(x); }, boundedOptions); producerBlock.LinkTo(bufferBlock); bufferBlock.LinkTo(broadcastBlock); broadcastBlock.LinkTo(producerBlock); broadcastBlock.LinkTo(consumerBlock); bufferBlock.Post(1); consumerBlock.Completion.Wait(); } } }
The application prints something like this:
2 1 3 4 5 69055 69053 69054 69057 438028 438040 142303 438079
This means that the manufacturer continues to spin and push messages to the consumer. I want him to stop and wait for the consumer to finish the current part of the work, and then the producer must continue to provide messages for the consumer.
My question is a quote similar to another question , but it was not answered properly. I tried this solution and it does not work here, which allows the manufacturer to flood the consumer with messages. Also setting BoundedCapacity does not work either.
The only solution I can guess so far is to make your own block, which will control the target block queue and act in accordance with the target block queue. But I hope this will be redundant for this problem.
kseen source share