C / C ++ Lock-free (or non-blocking) Ring Buffer that rewrites the oldest data?

I am trying to find a way to block Free or Non-blocking to create a ring buffer for one consumer / one consumer, which will overwrite the oldest data in the buffer. I read a lot of non-blocking algorithms that work when you "return false" if the buffer is full, i.e. do not add; but I can’t even find a pseudo-code that talks about how to do this when you need to overwrite the oldest data.

I use GCC 4.1.2 (restriction on work, I can’t update the version ...), and I have Boost libraries, and in the past I made my own Atomic <T>, which follows pretty close to the upcomming specification (its not perfect, but it is thread safe and does what I need).

When I thought about this, I decided that using these atoms should really take care of the problem. some rude psuedo code regarding what I thought:

template< typename T , unsigned int Size> class RingBuffer { private: Atomic<unsigned int> readIndex; Atomic<unsigned int> writeIndex; enum Capacity { size = Size }; T* buf; unsigned int getNextIndex(unsigned int i) { return (i + 1 ) % size; } public: RingBuffer() { //create array of size, set readIndex = writeIndex = 0 } ~RingBuffer() { //delete data } void produce(const T& t) { if(writeIndex == getNextIndex(readIndex)) //1 { readIndex = getNextIndex(readIndex); //2 } buf[writeIndex] = t; writeIndex = getNextIndex(writeIndex); //3 } bool consume(T& t) { if(readIndex == writeIndex) //4 return false; t = buf[readIndex]; readIndex = getNexIndex(readIndex); //5 return true; } }; 

As far as I can tell, there are no deadlock situations here, so we are safe from this (if my implementation above is incorrect even on its pseudo-code, constructive criticism is always appreciated). However, the BIG state I can find is:

suggests that the buffer is full. i.e. writeIndex +1 = readIndex; (1) occurs in the same way as consumption. and true (4) is wrong, so we move on to reading from buffer (5), and readIndex is one (therefore, in fact, the space in buffer (2) advances readIndex AGAIN, thus LOSING the value.

Basically, his classic writer problem is to change the reader, causing a race condition. Without actually blocking the entire list every time I access it, I cannot figure out how to prevent this. What am I missing?

+7
c ++ c nonblocking circular-buffer
source share
3 answers
  • Start with a single line of producers / several consumers with the appropriate guarantees of progress.
  • If the queue is full and clicking fails, enter a single value. Then a space appears to enter the new value.
+7
source share

What am I missing?

Lot:

  • say you consume t while it is being overwritten by the manufacturer - how do you detect / handle this?
    • many options - for example. do { copy value; verify that the copy is consistent using a sequence of num modifications, etc. } while ( corrupt )
  • using atomic numbers is not enough - you also need to use CAS-style loops to influence index increments (although I assume you know that, given that you've already read a lot about it)
  • memory barriers

But write that this is below your pseudo-code level and consider your explicit question:

  • point (5) will really require a CAS operation. If readIndex was correctly selected / copied at the top of consume() - before it is copied (possibly damaged) t , then the CAS instruction will fail if it has already been increased by the manufacturer. Instead of just re-dialing and restarting CAS, just keep going.
+1
source share

Here is the loopback buffer code for atomic variables that I recently created. I changed it to "override" the data instead of returning false. Disclaimer - this is not a test of product quality.

  template<int capacity, int gap, typename T> class nonblockigcircular { /* * capacity - size of cicular buffer * gap - minimum safety distance between head and tail to start insertion operation * generally gap should exceed number of threads attempting insertion concurrently * capacity should be gap plus desired buffer size * T - is a data type for buffer to keep */ volatile T buf[capacity]; // buffer std::atomic<int> t, h, ph, wh; /* t to h data available for reading * h to ph - zone where data is likely written but h is not updated yet * to make sure data is written check if ph==wh * ph to wh - zone where data changes in progress */ bool pop(T &pwk) { int td, tnd; do { int hd=h.load()%capacity; td=t.load()%capacity; if(hd==td) return false; tnd=(td+1)%capacity; } while(!t.compare_exchange_weak(td, tnd)); pwk=buf[td]; return true; } const int count() { return ( h.load()+capacity-t.load() ) % capacity; } bool push(const T &pwk) { const int tt=t.load(); int hd=h.load(); if( capacity - (hd+capacity-tt) % capacity < gap) { // Buffer is too full to insert // return false; // or delete last record as below int nt=t.fetch_add(1); if(nt==capacity-1) t.fetch_sub(capacity); } int nwh=wh.fetch_add(1); if(nwh==capacity-1) wh.fetch_sub(capacity); buf[nwh%capacity]=pwk; int nph=ph.fetch_add(1); if(nph==capacity-1) ph.fetch_sub(capacity); if(nwh==nph) { int ohd=hd; while(! h.compare_exchange_weak(hd, nwh) ) { hd=h.load(); if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break; } } return true; } }; 
0
source share

All Articles