How to make a fast producer paused when the consumer is overloaded?

I have a producer / consumer pattern in my application that implemented the used TPL data stream. I have a large data flow grid with approximately 40 blocks. There are two main functional parts in the grid: the producer part and the consumer part. The producer is supposed to constantly provide great work for the consumer, while the consumer processes the incoming work slowly. I want to suspend the work of the manufacturer when the consumer is busy with a certain number of work items. Otherwise, the application consumes a lot of memory / processor and behaves unstable.

I made a demo application that demonstrates the problem:

mesh

using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false }; var boundedOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false, BoundedCapacity = 5 }; var bufferBlock = new BufferBlock<int>(boundedOptions); var producerBlock = new TransformBlock<int, int>(x => x + 1, options); var broadcastBlock = new BroadcastBlock<int>(x => x, options); var consumerBlock = new ActionBlock<int>(async x => { var delay = 1000; if (x > 10) delay = 5000; await Task.Delay(delay); Console.WriteLine(x); }, boundedOptions); producerBlock.LinkTo(bufferBlock); bufferBlock.LinkTo(broadcastBlock); broadcastBlock.LinkTo(producerBlock); broadcastBlock.LinkTo(consumerBlock); bufferBlock.Post(1); consumerBlock.Completion.Wait(); } } } 

The application prints something like this:

 2 1 3 4 5 69055 69053 69054 69057 438028 438040 142303 438079 

This means that the manufacturer continues to spin and push messages to the consumer. I want him to stop and wait for the consumer to finish the current part of the work, and then the producer must continue to provide messages for the consumer.

My question is a quote similar to another question , but it was not answered properly. I tried this solution and it does not work here, which allows the manufacturer to flood the consumer with messages. Also setting BoundedCapacity does not work either.

The only solution I can guess so far is to make your own block, which will control the target block queue and act in accordance with the target block queue. But I hope this will be redundant for this problem.

+5
source share
2 answers

If you need to maintain the integrity of the producer -> buffer -> broadcast cycle, you need to replace the broadcast block with some other block that still sends the messages it receives, but waits for one of its goals to be full.

As long as you know the goals of this block, when you create it, you can build it using ActionBlock (code copied from another of my answer ):

 public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>( DataflowBlockOptions options, params ITargetBlock<T>[] targets) { var block = new ActionBlock<T>( async item => { foreach (var target in targets) { await target.SendAsync(item); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = options.BoundedCapacity, CancellationToken = options.CancellationToken }); block.Completion.ContinueWith(task => { foreach (var target in targets) { if (task.Exception != null) target.Fault(task.Exception); else target.Complete(); } }); return block; } 

Using this, you can declare a broadcast block:

 var broadcastBlock = CreateGuaranteedBroadcastBlock( boundedOptions, producerBlock, consumerBlock); 

(You also need to remove LinkTo lines that reference broadcastBlock .)

One of the problems with the source code that this does not fix is ​​the termination, but this is a complex problem in the TPL data stream with loops in general.

+4
source

It seems that your producer is generating a sequence, so there is no need for a full cycle of preparation β†’ buffer β†’ translation. Instead, all three blocks can be replaced with an async loop, which generates the following element and then sends it to the consumer using await SendAsync() :

 Task.Run(async () => { int i = 1; while (true) { await consumerBlock.SendAsync(i); i++; } consumerBlock.Complete(); }); 

Thus, as soon as the consumer reaches their capacity, await SendAsync() will ensure that the manufacturer waits until the consumer travels to the goods.

If you want to encapsulate such a manufacturer in a data flow block so that you can, for example, link to it, you can .

0
source

All Articles