If you can use .Net 4.5, I would suggest looking at Dataflow from a parallel task library (TPL) .
This page leads to many example walkthroughs, such as a Practical Guide. Introducing a Consumer-Producer Flow Template and Walkthrough. Using data flow in a Windows Forms application .
Take a look at this documentation to see if it helps you. This is quite a lot to get involved, but I think this is likely to be your best approach.
Alternatively, you can examine the BlockingCollection along with your GetConsumingEnumerable() method to access the items in the queue.
What you do is divide the work into the objects you want to process and use the BlockingCollection to manage the queue.
Some code examples that use ints rather than objects as work items will help demonstrate this:
When the workflow completes with the current item, it will remove the new item from the work queue, process that item, and add it to the output queue.
A separate consumer stream removes completed items from the output queue and does something with them.
In the end, we must wait for all workers to finish (Task.WaitAll (workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding ()).
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection<int> inputQueue = new BlockingCollection<int>(); BlockingCollection<int> outputQueue = new BlockingCollection<int>(); } }
Matthew watson
source share