Multipoint Queue Atomic Operations

I play with std :: atomic structures and wrote this multi-user non-locking multi-user queue, which I am attaching here. The idea of ​​a queue consists of two stacks - a stack of producers and consumers, which are essentially connected by list structures. List nodes contain indexes in an array in which the actual data is stored, where you will read or write.

The idea would be that the nodes for the lists are mutually exclusive, i.e. a pointer to a node can exist only in the list of producers or consumers. The manufacturer will try to get the node from the list of manufacturers, the consumer from the list of consumers, and whenever the pointer to the node is purchased by the manufacturer or consumer, it must be from both lists so that no one else can buy it. I use the std :: atomic_compare_exchange functions to rotate until the node pops up.

The problem is that there must be something wrong with the logic, or the operations are not atomic, as I assume, because even with 1 producer and 1 consumer, given enough time, the queue will live and that I noticed that if you claim that cell! = cell-> m_next, the statement will be amazed! So he probably has something looking at me in the face, and I just don’t see him, so I wonder if anyone can intervene.

thank

#ifndef MTQueue_h
#define MTQueue_h

#include <atomic>

template<typename Data, uint64_t queueSize>
class MTQueue
{
public:

    MTQueue() : m_produceHead(0), m_consumeHead(0)
    {
        for(int i=0; i<queueSize-1; ++i)
        {
            m_nodes[i].m_idx = i;
            m_nodes[i].m_next = &m_nodes[i+1];
        }
        m_nodes[queueSize-1].m_idx = queueSize - 1;
        m_nodes[queueSize-1].m_next = NULL;

        m_produceHead = m_nodes;
        m_consumeHead = NULL;
    }

    struct CellNode
    {
        uint64_t m_idx;
        CellNode* m_next;
    };

    bool push(const Data& data)
    {
        if(m_produceHead == NULL)
            return false;

        // Pop the producer list.
        CellNode* cell = m_produceHead;
        while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                   &cell, cell->m_next))
        {
            cell = m_produceHead;
            if(!cell)
                return false;
        }

        // At this point cell should point to a node that is not in any of the lists
        m_data[cell->m_idx] = data;

        // Push that node as the new head of the consumer list
        cell->m_next = m_consumeHead;
        while (!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                    &cell->m_next, cell))
        {
            cell->m_next = m_consumeHead;
        }
        return true;
    }

    bool pop(Data& data)
    {
        if(m_consumeHead == NULL)
            return false;

        // Pop the consumer list
        CellNode* cell = m_consumeHead;
        while(!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                   &cell, cell->m_next))
        {
            cell = m_consumeHead;
            if(!cell)
                return false;
        }

        // At this point cell should point to a node that is not in any of the lists
        data = m_data[cell->m_idx];

        // Push that node as the new head of the producer list
        cell->m_next = m_produceHead;
        while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                   &cell->m_next, cell))
        {
            cell->m_next = m_produceHead;
        }
        return true;
    };

private:

    Data m_data[queueSize];

    // The nodes for the two lists
    CellNode m_nodes[queueSize];

    volatile std::atomic<CellNode*> m_produceHead;
    volatile std::atomic<CellNode*> m_consumeHead;
};

#endif
+3
source share
2 answers

I see several problems with your queue implementation:

  • , : , , - . , - , . : , , , , .

  • push pop CellNode::m_next (, , .. node node ). , . pop m_consumeHead. 1 data. Thread 1 m_produceHead cell->m_next, Thread 2 cell->m_next, std::atomic_compare_exchange_strong_explicit. cell->m_next - .

    , "" concurrency: / , . , , , , m_next memory_order_relaxed , .

  • ABA

    . , (, m_produceHead m_consumeHead), , , , - . , , - , . :

    • 1 pop m_consumeHead m_consumeHead->m_next, -.
    • 2 node m_consumeHead .
    • 3 m_consumeHead.
    • Thread 2 node m_produceHead.
    • 3 node m_produceHead m_consumeHead.
    • Thread 1, , , , m_consumeHead . node - , m_consumeHead stale m_next, 1. , Thread 3 , .
+1

, . livelock 1000000 / 2 1024 1 1 100 /100 .

. , cell- > m_next (, ) :

, -, , . :

bool push(const TData& data)
{
    CellNode* cell = m_produceHead.load(std::memory_order_acquire);
    if(cell == NULL)
        return false;

    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead,
                                                        &cell,
                                                        cell->m_next,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        if(!cell)
            return false;
    }

    m_data[cell->m_idx] = data;

    CellNode* curHead = m_consumeHead;
    cell->m_next = curHead;
    while (!std::atomic_compare_exchange_strong_explicit(&m_consumeHead,
                                                         &curHead,
                                                         cell,
                                                         std::memory_order_acquire,
                                                         std::memory_order_release))
    {
        cell->m_next = curHead;
    }

    return true;
}

bool pop(TData& data)
{
    CellNode* cell = m_consumeHead.load(std::memory_order_acquire);
    if(cell == NULL)
        return false;

    while(!std::atomic_compare_exchange_strong_explicit(&m_consumeHead,
                                                        &cell,
                                                        cell->m_next,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        if(!cell)
            return false;
    }

    data = m_data[cell->m_idx];

    CellNode* curHead = m_produceHead;
    cell->m_next = curHead;
    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead,
                                                        &curHead,
                                                        cell,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        cell->m_next = curHead;
    }

    return true;
};
+1

All Articles