How to implement BlockingCollection to fix this producer / consumer problem?

I currently have an application that receives packets from a socket, processing them and adding them to ConcurrentQueue. Then I have a separate thread that processes these elements.

The problem I am facing is the manufacturer / consumer problem, when the consumer tries to take elements even if there are no elements, which leads to significant CPU utilization.

ProcessPackets starts in its thread:

private ConcurrentQueue<PrimaryPacket> Waiting = new ConcurrentQueue<PrimaryPacket>(); private void ProcessPackets() { PrimaryPacket e; while (true) { if (Waiting.TryDequeue(out e)) { Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e); } } } public void AddPacket(PrimaryPacket e) { Waiting.Enqueue(e); } 

enter image description here

What would be the best way to implement BlockingCollection (T) to solve this problem? Or another solution?

It should also be noted that about 30,000 items are added to the queue per second.

+4
source share
1 answer

You do not need to implement BlockingCollection<T> , you can just use it. As the documentation says, this is just a wrapper for IProducerConsumerCollection<T> , such as ConcurrentQueue<T> (which is used by default).

 private BlockingCollection<PrimaryPacket> Waiting = new BlockingCollection<PrimaryPacket>(); private void ProcessPackets() { while (true) { PrimaryPacket e = Waiting.Take(); Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e); } } public void AddPacket(PrimaryPacket e) { Waiting.Add(e); } 

Take() blocks if the queue is empty, so it will not restart the CPU unnecessarily. And you should probably think about what to do when processing is complete.

+5
source

All Articles