N number of threads receiving / performing tasks asynchronously

I have an unlimited number of tasks in a db queue somewhere. What is the best way for one program to work simultaneously with n tasks on n different threads, starting with new tasks when old ones are running? When one task completes, you must start another task asynchronously. The current counter must always be n.

My initial thought was to use a thread pool, but this seems unnecessary given that tasks that need to be worked on will be retrieved in separate threads. In other words, each thread on its own performs its next task, and does not have a main thread, receiving tasks and then distributing them.

I see several options for this, and I don't know which one I should use for optimal performance.

1) Thread pool - in the light of the absence of necessarily waiting threads, I'm not sure if this is necessary.

2) A semaphore is the same as 1. What is the use of a semaphore if there are no tasks waiting to be allocated by the main thread?

3) The same topics forever - disable the program using n threads. When a thread is running, it itself performs the following task. The main thread only controls that n threads are still alive.

4) Event handling. Same as 3, except that when the thread finishes the task, it fires the ImFinished event before it dies. The ImFinished event handler starts a new thread. This is like 3, but with a lot of overhead (since new threads are constantly being created)

5) Something else?

+4
source share
4 answers

BlockingCollection makes all of this pretty trivial:

 var queue = new BlockingCollection<Action>(); int numWorkers = 5; for (int i = 0; i < numWorkers; i++) { Thread t = new Thread(() => { foreach (var action in queue.GetConsumingEnumerable()) { action(); } }); t.Start(); } 

You can then add the main item to the lock collection after running workers (or earlier if you want). You can even create multiple producer threads to add items to the queue.

Note that a more traditional approach would be to use Tasks instead of using Thread classes. The main reasons why I did not propose this in the first place is that you specifically requested the exact number of threads that will be executed (and not the maximum), and you simply do not have much control over how Task objects are executed (which that's good, they can be optimized on your behalf). If this management is not as important as you stated, this may be preferable:

 var queue = new BlockingCollection<Action>(); int numWorkers = 5; for (int i = 0; i < numWorkers; i++) { Task.Factory.StartNew(() => { foreach (var action in queue.GetConsumingEnumerable()) { action(); } }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); } 
+4
source

I like model number 3, and I used it before; it reduces the number of starts and stops of threads and makes the main thread a true β€œsupervisor”, reducing the work that it needs to do.

As Servy points out, the System.Collections.Concurrent namespace has several constructs that are extremely valuable here. ConcurrentQueue is a thread-safe implementation of the FIFO collection, intended for use in such a model; one or more producer threads add items to the input side of the queue, while one or more consumers take items from the other end. If there is nothing in the queue, a call to get the item simply returns false; you can respond to this by exiting the task method (the supervisor can then decide whether to start another task, perhaps by monitoring the entry into the queue and building up when more items arrive).

BlockingCollection adds a behavior in which threads wait when they try to get a value from the queue if there is nothing in the queue. It can also be set to the maximum capacity, over which it will block the flows of the "producer", adding more elements until there is no available capacity. BlockingCollection uses ConcurrentQueue by default, but you can configure it as a stack, dictionary, or bag if you want. Using this model, you can run tasks indefinitely; when they have nothing to do, they simply block until there is something for at least one of them to work, so all supervisors should check if the tasks are an error (a critical element of any reliable workflow template).

0
source

This is easily achieved using the TPL Dataflow library .

First, suppose you have a BufferBlock<T> , this is your turn:

 var queue = new BufferBlock<T>(); 

Then you need to perform the action on the block, this is represented by the ActionBlock<T> class :

 var action = new ActionBlock<T>(t => { /* Process t here */ }, new ExecutionDataflowBlockOptions { // Number of concurrent tasks. MaxDegreeOfParallelism = ..., }); 

Note the constructor above, it takes an instance of ExecutionDataflowBlockOptions and sets MaxDegreeOfParallelism to use multiple parallel elements that you want to handle at the same time.

Below the surface, a parallel task library is used to handle thread allocation for tasks, etc. TPL Dataflow is for a higher level abstraction, which allows you to configure just how much parallelism / throttling / etc you want.

For example, if you do not want ActionBlock<TInput> to ActionBlock<TInput> any elements (preferring them to BufferBlock<T> in BufferBlock<T> ), you can also set the BoundedCapacity property , which limits the number of elements that ActionBlock<TInput> will hold ActionBlock<TInput> once (including number of processed items, as well as reserved items):

 var action = new ActionBlock<T>(t => { /* Process t here */ }, new ExecutionDataflowBlockOptions { // Number of concurrent tasks. MaxDegreeOfParallelism = ..., // Set to MaxDegreeOfParallelism to not buffer. BoundedCapacity ..., }); 

In addition, if you want to create a new, fresh Task<TResult> instance to process each element, then you can set the MaxMessagesPerTask property , indicating that each Task<TResult> process one element:

 var action = new ActionBlock<T>(t => { /* Process t here */ }, new ExecutionDataflowBlockOptions { // Number of concurrent tasks. MaxDegreeOfParallelism = ..., // Set to MaxDegreeOfParallelism to not buffer. BoundedCapacity ..., // Process once item per task. MaxMessagesPerTask = 1, }); 

Please note that depending on the number of other tasks performed by your application, this may or may not be optimal for you, and you may also think about the cost of deploying a new task for each element that comes through ActionBlock<TInput> .

From there, just bind BufferBlock<T> to ActionBlock<TInput> with a call to the LinkTo method:

 IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true; }); 

You set the PropogateCompletion property to true so that when you wait on ActionBlock<T> completion will be sent to ActionBlock<T> (if / if there are no more elements to process), which you can subsequently expect.

Note that you can call the Dispose method on the IDisposable interface returned from the LinkTo call if you want a link between blocks to be deleted.

Finally, you send items to the buffer using the Post method:

 queue.Post(new T()); 

And when you are done (if you have ever done), you call the Complete method:

 queue.Complete(); 

Then in the action block you can wait until this happens, waiting for the Task instance opened by the Completion property :

 action.Completion.Wait(); 

Hope the elegance of this is clear:

  • You do not need to control the creation of new Task instances / threads / etc to control the work, the blocks do this for you based on the parameters that you provide (and this depends on each block).
  • A cleaner separation of concerns. The buffer is separated from the action, like all other blocks. You create blocks and then link them together.
0
source

I am a VB guy, but you can easily translate:

 Private Async Sub foo() Dim n As Integer = 16 Dim l As New List(Of Task) Dim jobs As New Queue(Of Integer)(Enumerable.Range(1, 100)) For i = 1 To n Dim j = jobs.Dequeue l.Add(Task.Run((Sub() Threading.Thread.Sleep(500) Console.WriteLine(j) End Sub))) Next While l.Count > 0 Dim t = Await Task.WhenAny(l) If jobs.Count > 0 Then Dim j = jobs.Dequeue l(l.IndexOf(t)) = (Task.Run((Sub() Threading.Thread.Sleep(500) Console.WriteLine(j) End Sub))) Else l.Remove(t) End If End While End Sub 

There is an article by Stephen Tub, why you should not use Task.WhenAny this way ... WITH A LARGE LIST OF TASKS, but with "some" tasks you usually do not encounter a problem

The idea is quite simple: you have a list in which you add as many (running) tasks as you want to run in parallel. Then you (a) wait until the first one finishes. If there are still jobs in the queue, you assign the task to a new task, and then (a) wait again. If there are no jobs in the queue, you simply delete the completed task. If both task lists and the queue is empty, you are done.

Stephen Tub article: http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

-1
source

All Articles