Stack overflow on attempt to implement lock without queue


I performed a free queue based on the algorithm specified by Maged M. Michael and Michael L. Scott. Simple, fast and practical locking and locking Parallel queue algorithms (for the algorithm, go to page 4)

I used atomic operation on shared_ptrfor example std::atomic_load_explicitetc.

when using a queue in only one thread, everything is fine, but when using it from another thread, I get an exception.

Unfortunately, I could not trace the source of the problem. it seems that when one shared_ptrgoes out of scope, it reduces the number of links to the next ConcurrentQueueNodeand causes infinite recursion, but I don’t understand why ..

The code:

Node queue:

template<class T>
struct ConcurrentQueueNode {
    T m_Data;
    std::shared_ptr<ConcurrentQueueNode> m_Next;

    template<class ... Args>
    ConcurrentQueueNode(Args&& ... args) :
        m_Data(std::forward<Args>(args)...) {}

    std::shared_ptr<ConcurrentQueueNode>& getNext() {
        return m_Next;
    }

    T getValue() {
        return std::move(m_Data);
    }

};

(: ):

template<class T>
class ConcurrentQueue {
    std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail;

public:

ConcurrentQueue(){
    m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>();
}

template<class ... Args>
void push(Args&& ... args) {
    auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...);
    std::shared_ptr<ConcurrentQueueNode<T>> tail;

    for (;;) {
        tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire);
        std::shared_ptr<ConcurrentQueueNode<T>> next = 
            std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire);

        if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) {
            if (next.get() == nullptr) {
                auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext();
                auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node);
                if (res) {
                    break;
                }
            }
            else {
                std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
            }
        }
    }

    std::atomic_compare_exchange_strong(&m_Tail, &tail, node);
}

bool tryPop(T& dest) {
    std::shared_ptr<ConcurrentQueueNode<T>> head;
    for (;;) {
        head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire);
        auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire);
        auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire);

        if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) {
            if (head.get() == tail.get()) {
                if (next.get() == nullptr) {
                    return false;
                }
                std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
            }
            else {
                dest = next->getValue();
                auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next);
                if (res) {
                    break;
                }
            }
        }
    }

    return true;
}
};

, :

int main(){
    ConcurrentQueue<int> queue;
    std::thread threads[4];

for (auto& thread : threads) {
    thread = std::thread([&queue] {

        for (auto i = 0; i < 100'000; i++) {
            queue.push(i);
            int y;
            queue.tryPop(y);
        }
    });
}

for (auto& thread : threads) {
    thread.join();
}
return 0;
}
+4
1

- , node , , .

, , .

for (auto i = 1; i < 100000; i++) {
  queue.push(i);
  //int y;
  //queue.tryPop(y);
}

unrecursive-ize :

__forceinline ~ConcurrentQueueNode() {
    if (!m_Next || m_Next.use_count() > 1)
        return;
    KillChainOfDeath();
}
void KillChainOfDeath() {
    auto pThis = this;
    std::shared_ptr<ConcurrentQueueNode> Next, Prev;
    while (1) {
        if (pThis->m_Next.use_count() > 1)
          break;
        Next.swap(pThis->m_Next); // unwire node
        Prev = NULL; // free previous node that we unwired in previous loop
        if (!(pThis = Next.get())) // move to next node
            break;
        Prev.swap(Next); // else Next.swap will free before unwire.
    }
}

shared_ptr, , . , shared_ptr, , ABA. shared_ptr - , ABA, , , CAS. , , .

+2

All Articles