Thinking Message Queuing

We have queued messages using C # Queue. We know that we have only one consumer to receive an available message from the queue for processing using a loop while. We also know that there is only one producer to send a message to the queue.

We have lockin the specified message queue to make sure that the consumer and the manufacturer cannot simultaneously access the queue.

My question is what is lockneeded? If it Queueincreases its Countproperty AFTER , the element is actually added, and if the consumer checks Countbefore retrieving, the consumer should receive the full message element, even if we don’t have that lock. Correctly? Therefore, we will not encounter the problem of partial communication. Then we can get rid of what lock?

That lockwill slow down the system, and sometimes we can see that the load flow is blocked for a while, because we have a very heavy manufacturer.

EDIT:

Unfortunately, we are using .Net 3.5.

+5
source share
7 answers

, , , .

, , , . , , , .

, enqueue\dequeue , .

ConcurrentQueue .NET 4.0, , , , .

+10

, Queue<T>. , ConcurrentQueue<T>, , BlockingCollection<T>.

, foreach collection.GetConsumingEnumerable(). . , , " ".

+4

, ... ?

, ( ):

public T Dequeue()
{
    if (this._size == 0)
    {
        ThrowHelper.ThrowInvalidOperationException(ExceptionResource.InvalidOperation_EmptyQueue);
    }
    T local = this._array[this._head];
    this._array[this._head] = default(T);
    this._head = (this._head + 1) % this._array.Length;
    this._size--;
    this._version++;
    return local;
}

public void Enqueue(T item)
{
    if (this._size == this._array.Length)
    {
        int capacity = (int) ((this._array.Length * 200L) / 100L);
        if (capacity < (this._array.Length + 4))
        {
            capacity = this._array.Length + 4;
        }
        this.SetCapacity(capacity);
    }
    this._array[this._tail] = item;
    this._tail = (this._tail + 1) % this._array.Length;
    this._size++;
    this._version++;
}

, , ( ) . _array, _head _tail , .

, lock(), , _size _version. , , _version, _size - .

+1

BTW, . , :

class LocklessQueue<T>
{
    class Item
    {
        public Item Next;
        bool _valid;
        T _value;
        public Item(bool valid, T value)
        {
            _valid = valid;
            _value = value;
            Next = null;
        }
        public bool IsValid { get { return _valid; } }
        public T TakeValue()
        {
            T value = _value;
            _valid = false;
            _value = default(T);
            return value;
        }
    }

    Item _first;
    Item _last;

    public LocklessQueue()
    {
        _first = _last = new Item(false, default(T));
    }

    public bool IsEmpty
    { 
        get
        {
            while (!_first.IsValid && _first.Next != null)
                _first = _first.Next;
            return false == _first.IsValid;
        }
    }

    public void Enqueue(T value)
    {
        Item i = new Item(true, value);
        _last.Next = i;
        _last = i;
    }

    public T Dequeue()
    {
        while (!_first.IsValid && _first.Next != null)
            _first = _first.Next;

        if (IsEmpty)
            throw new InvalidOperationException();//queue is empty

        return _first.TakeValue();
    }
}
+1

Locking; . Queue System.Collections, Queue (System.Collections.Queue.Synchronized() Queue). Queue<T>.SyncRoot :

using System.Collections.Generic;
public static class Q_Example
{
    private readonly Queue<int> q = new Queue<int>();
    public void Method1(int val)
    {
        lock(q.SyncRoot)
        {
            q.EnQueue(val);
        }
    }

    public int Method2()
    {
        lock(q.SyncRoot)
        {
            return q.Dequeue();
        }
    }
}
0

ConcurrentQueueavailable even if you are using .NET 3.5. Reactive Extensions includes what used to be .NET 3.5 parallel extensions — the forerunner of the parallel task library included with .NET 4.0.

0
source

All Articles