Multi-threaded process or task queue

I have a telephony messaging application that has a lot of messages that need to be processed. Since telephone ports are limited, the message will be processed first first. Each message has a โ€œConfirmationโ€ flag, which indicates which one has been processed. Of course, it was initialized as false.

I want to put all messages in a queue, then process them with multiple threads or tasks.

public class MessageQueue { public Queue MessageWorkItem { get; set; } public Messages Message { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); Message = new Messages(); } public void GetMessageMetaData() { try { // It is just a test, add only one item into the queue Message.MessageID = Guid.NewGuid(); Message.NumberToCall = "1111111111"; Message.FacilityID = "3333"; Message.NumberToDial = "2222222222"; Message.CountryCode = "1"; Message.Acknowledge = false; } catch (Exception ex) { } } public void AddingItemToQueue() { GetMessageMetaData(); if (!Message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(Message); } } } } public class Messages { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } 

My question now is how to remove an object from the queue using multithreading. For each item in the queue I want to run a script.

  public void RunScript(Message item) { try { PlayMessage(item); return; } catch (HangupException hex) { Log.WriteWithId("Caller Hungup!", hex.Message); } catch (Exception ex) { Log.WriteException(ex, "Unexpected exception: {0}"); } } 

I thought if

if (MessageWorkItem.Count> = 1) Then do something, but I need help with the code.

+8
multithreading c # task-parallel-library
source share
2 answers

If you can use .Net 4.5, I would suggest looking at Dataflow from a parallel task library (TPL) .

This page leads to many example walkthroughs, such as a Practical Guide. Introducing a Consumer-Producer Flow Template and Walkthrough. Using data flow in a Windows Forms application .

Take a look at this documentation to see if it helps you. This is quite a lot to get involved, but I think this is likely to be your best approach.

Alternatively, you can examine the BlockingCollection along with your GetConsumingEnumerable() method to access the items in the queue.

What you do is divide the work into the objects you want to process and use the BlockingCollection to manage the queue.

Some code examples that use ints rather than objects as work items will help demonstrate this:

When the workflow completes with the current item, it will remove the new item from the work queue, process that item, and add it to the output queue.

A separate consumer stream removes completed items from the output queue and does something with them.

In the end, we must wait for all workers to finish (Task.WaitAll (workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding ()).

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection<int> inputQueue = new BlockingCollection<int>(); BlockingCollection<int> outputQueue = new BlockingCollection<int>(); } } 
+13
source share

Parallel.ForEach from TPL . It is parallel for everyone.

Example (modified by MessageWorkItem for a shared queue):

  public class MessageQueue { public Queue<Message> MessageWorkItem { get; set; } public MessageQueue() { MessageWorkItem = new Queue<Message>(); } public Message GetMessageMetaData() { try { // It is just a test, add only one item into the queue return new Message() { MessageID = Guid.NewGuid(), NumberToCall = "1111111111", FacilityID = "3333", NumberToDial = "2222222222", CountryCode = "1", Acknowledge = false }; } catch (Exception ex) { return null; } } public void AddingItemToQueue() { var message = GetMessageMetaData(); if (!message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(message); } } } } public class Message { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } class Program { static void Main(string[] args) { MessageQueue me = new MessageQueue(); for (int i = 0; i < 10000; i++) me.AddingItemToQueue(); Console.WriteLine(me.MessageWorkItem.Count); Parallel.ForEach(me.MessageWorkItem, RunScript); } static void RunScript(Message item) { // todo: ... Console.WriteLine(item.MessageID); Thread.Sleep(300); } } 
+2
source share

All Articles