Processing only n elements at a time using a parallel task library

This all happens in the Windows service.

I have a Queue<T> (actually a ConcurrentQueue<T> ) holding items waiting to be processed. But I do not want to process only one at a time, I want to process n elements at a time, where n is a custom integer.

How can I do this using a parallel task library?

I know that TPL will share collections on behalf of the developer for parallel processing, but I'm not sure if this is the function I'm working on. I am new to multithreading and TPL.

+8
multithreading concurrency parallel-processing task-parallel-library
source share
3 answers

Use BlockingCollection<T> instead of ConcurrentQueue<T> , then you can start any number of consumer threads and use the Take method of BlockingCollection . if the collection is empty, the Take method will automatically block the thread thread waiting for elements to be added; otherwise, the threads will consume all elements of the queue in parallel. However, since the use of TPL is mentioned in your question, it turns out that Parallel.ForEach has some problems when used with BlockingCollection checks this post for more details. so you need to manage the creation of your consumer flows. new Thread(/*consumer method*/) or new Task() ...

+4
source share

Here is one idea that involves creating an extension method for TaskFactory .

 public static class TaskFactoryExtension { public static Task StartNew(this TaskFactory target, Action action, int parallelism) { var tasks = new Task[parallelism]; for (int i = 0; i < parallelism; i++) { tasks[i] = target.StartNew(action); } return target.StartNew(() => Task.WaitAll(tasks)); } } 

Then your call code will look like this.

 ConcurrentQueue<T> queue = GetQueue(); int n = GetDegreeOfParallelism(); var task = Task.Factory.StartNew( () => { T item; while (queue.TryDequeue(out item)) { ProcessItem(item); } }, n); task.Wait(); // Optionally wait for everything to finish. 

Here is another idea using Parallel.ForEach . The problem with this approach is that your degrees of parallelism may not necessarily be fulfilled. You specify only the maximum allowable amount, not the absolute amount.

 ConcurrentQueue<T> queue = GetQueue(); int n = GetDegreeOfParallelism(); Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, (item) => { ProcessItem(item); }); 
+4
source share

I also recommend using BlockingCollection instead of using ConcurrentQueue directly.

Here is an example:

 public class QueuingRequestProcessor { private BlockingCollection<MyRequestType> queue; public void QueuingRequestProcessor(int maxConcurrent) { this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); Task[] consumers = new Task[maxConcurrent]; for (int i = 0; i < maxConcurrent; i++) { consumers[i] = Task.Factory.StartNew(() => { // Will wait when queue is empty, until CompleteAdding() is called foreach (var request in this.queue.GetConsumingEnumerable()) { Process(request); } }); } } public void Add(MyRequest request) { this.queue.Add(request); } public void Stop() { this.queue.CompleteAdding(); } private void Process(MyRequestType request) { // Do your processing here } } 

Note that maxConcurrent in the constructor determines how many requests will be processed simultaneously.

+1
source share

All Articles