Attempting to create atomic link counts does not work with a dead end. Is it correct?

So, I'm trying to create a copy-to-write map that uses an attempt to count atomic links on the read side so as not to block.

Something is not quite right. I see that some links are overflowing and some are dropping negatively, so something is not atomic. In my tests, I have 10 read threads, the loop of which is 100 times, each of which does the get () thread and 1 writer, which executes 100 records.

He is stuck in the writer because some links never drop to zero, although they should.

I am trying to use the 128-bit DCAS methodology described in this blog post .

Is there something egregious in this regard, or is there an easier way to debug this rather than playing with it in the debugger?

typedef std::unordered_map<std::string, std::string> StringMap; static const int zero = 0; //provides an l-value for asm code class NonBlockingReadMapCAS { public: class OctaWordMapWrapper { public: StringMap* fStringMap; //std::atomic<int> fCounter; int64_t fCounter; OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { } OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { } ~OctaWordMapWrapper() { delete fStringMap; } /** * Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap * pointer and fCounter. */ static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) { bool cas_result; __asm__ __volatile__ ( "lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success "setz %3;" // if ZF set, set cas_result to 1 : "+m" (*target), "+a" (compareMap), //compare target stringmap pointer to compareMap "+d" (compareCounter), //compare target counter to compareCounter "=q" (cas_result) //results : "b" (swapMap), //swap target stringmap pointer with swapMap "c" (swapCounter) //swap target counter with swapCounter : "cc", "memory" ); return cas_result; } OctaWordMapWrapper* atomicIncrementAndGetPointer() { if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1)) return this; else return NULL; } OctaWordMapWrapper* atomicDecrement() { while(true) { if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1)) break; } return this; } bool atomicSwapWhenNotReferenced(StringMap* newMap) { return doubleCAS(this, this->fStringMap, zero, newMap, 0); } } __attribute__((aligned(16))); std::atomic<OctaWordMapWrapper*> fReadMapReference; pthread_mutex_t fMutex; NonBlockingReadMapCAS() { fReadMapReference = new OctaWordMapWrapper(); } ~NonBlockingReadMapCAS() { delete fReadMapReference; } bool contains(const char* key) { std::string keyStr(key); return contains(keyStr); } bool contains(std::string &key) { OctaWordMapWrapper *map; do { map = fReadMapReference.load()->atomicIncrementAndGetPointer(); } while (!map); bool result = map->fStringMap->count(key) != 0; map->atomicDecrement(); return result; } std::string get(const char* key) { std::string keyStr(key); return get(keyStr); } std::string get(std::string &key) { OctaWordMapWrapper *map; do { map = fReadMapReference.load()->atomicIncrementAndGetPointer(); } while (!map); //std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n"; std::string value = map->fStringMap->at(key); map->atomicDecrement(); return value; } void put(const char* key, const char* value) { std::string keyStr(key); std::string valueStr(value); put(keyStr, valueStr); } void put(std::string &key, std::string &value) { pthread_mutex_lock(&fMutex); OctaWordMapWrapper *oldWrapper = fReadMapReference; OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper); std::pair<std::string, std::string> kvPair(key, value); newWrapper->fStringMap->insert(kvPair); fReadMapReference.store(newWrapper); std::cout << oldWrapper->fCounter << "\n"; while (oldWrapper->fCounter > 0); delete oldWrapper; pthread_mutex_unlock(&fMutex); } void clear() { pthread_mutex_lock(&fMutex); OctaWordMapWrapper *oldWrapper = fReadMapReference; OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper); fReadMapReference.store(newWrapper); while (oldWrapper->fCounter > 0); delete oldWrapper; pthread_mutex_unlock(&fMutex); } }; 
+7
c ++ multithreading concurrency atomic
source share
4 answers

Maybe not the answer, but it looks suspicious to me:

 while (oldWrapper->fCounter > 0); delete oldWrapper; 

You can have a reader thread that simply enters atomicIncrementAndGetPointer() when the counter is 0, thus pulling the carpet under the reader thread, removing the wrapper.

Edit to summarize the comments below for a potential solution:

The best implementation that I know of is moving fCounter from OctaWordMapWrapper to fReadMapReference (you don't need the OctaWordMapWrapper class at OctaWordMapWrapper ). When the counter is zero, change the pointer to your writer. Since you may have a high conflict with reader streams that essentially block the writer indefinitely, you may have the most significant bit fCounter allocated to block the reader, i.e. When this bit is set, readers rotate until the bit is cleared. The writer sets this bit ( __sync_fetch_and_or() ) when it is about to change the pointer, waits until the counter drops to zero (i.e., Existing readers finish their work), and then change the pointer and clear the bit.

This approach should be waterproof, although it obviously blocks readers from writing. I do not know if this is acceptable in your situation, and ideally you would like it to be non-blocking.

The code will look something like this (not verified!):

 class NonBlockingReadMapCAS { public: NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) {} private: StringMap *acquire_read() { while(1) { uint32_t counter=atom_inc(m_counter); if(!(counter&0x80000000)) return m_ptr; atom_dec(m_counter); while(m_counter&0x80000000); } return 0; } void release_read() { atom_dec(m_counter); } void acquire_write() { uint32_t counter=atom_or(m_counter, 0x80000000); assert(!(counter&0x80000000)); while(m_counter&0x7fffffff); } void release_write() { atom_and(m_counter, uint32_t(0x7fffffff)); } StringMap *volatile m_ptr; volatile uint32_t m_counter; }; 

Just call the / release_program / write () method before and after accessing the pointer for reading / writing. Replace atom_inc/dec/or/and() with __sync_fetch_and_add() , __sync_fetch_and_sub() , __sync_fetch_and_or() and __sync_fetch_and_and() respectively. You do not need doubleCAS() .

As pointed out in the comments of @Quuxplusone in the comment below, this is one manufacturer and multiple consumer implementation. I changed the code for the correct statement to ensure its execution.

+3
source share

Well, there are probably a lot of problems, but here are the obvious two.

The most trivial error in atomicIncrementAndGetPointer . You wrote:

 if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1)) 

That is, you are trying to increase this->fCounter with a lock. But this does not work, because you retrieve the old value twice, not guaranteeing that the same value is read every time. Consider the following sequence of events:

  • Thread A selects this->fCounter (with a value of 0) and evaluates argument 5 as this->fCounter +1 = 1 .
  • Thread B successfully increments the counter.
  • Thread A selects this->fCounter (with value 1) and evaluates argument 3 as this->fCounter = 1 .
  • Thread A executes doubleCAS(this, this->fStringMap, 1, this->fStringMap, 1) . Of course, this succeeds, but we lost the "gain" that we tried to make.

What you wanted is more like

 StringMap* oldMap = this->fStringMap; int64_t oldCounter = this->fCounter; if (doubleCAS(this, oldMap, oldValue, oldMap, oldValue+1)) ... 

Another obvious problem is that there is a data layout between get and put . Consider the following sequence of events:

  • Thread A starts the execution of get : it retrieves fReadMapReference.load() and prepares to execute atomicIncrementAndGetPointer at this memory address.
  • Thread B completes the put execution: it deletes this memory address. (This is done within the scope of its rights, because the wrapper reference count is still at zero.)
  • In thread A, atomicIncrementAndGetPointer at the remote memory address. If you're lucky, you're a segfault, but of course, in practice, you probably won't.

As explained in the blog post:

The garbage collection interface is omitted, but in real applications you will need to scan the hazard pointers before deleting the node.

+2
source share

Another user suggested a similar approach, but if you are compiling with gcc (and possibly with clang), you can use the built-in __sync_add_and_fetch_4, which does something similar to what your build code does, and most likely much more portable. I used it when I implemented recounting in the Ada library (but the algorithm has not changed).

 int __sync_add_and_fetch_4 (int* ptr, int value); // increments the value pointed to by ptr by value, and returns the new value 
+2
source share

Although I'm not sure how your reader threads work, I suspect that your problem is that you are not catching and not handling possible out_of_range exceptions in your get() method that may arise from this line: std::string value = map->fStringMap->at(key); . Note that if key not found on the map, this will exit and exit the function without decreasing the counter, which will lead to the condition you described (getting stuck in the while loop in the write stream while waiting for counters to decrease).

In any case, regardless of whether this is the cause of the problems you see or not, you definitely need to either handle this exception (and any others) or change the code so that there is no risk of a throw. For the at() method, I would probably just use find() instead, and then return an iterator. More generally, however, I would suggest using the RAII pattern to ensure that you do not allow unexpected exceptions to run without unlocking / decrementing. For example, you can check boost::scoped_lock to wrap your fMutex and then write something simple like this to increase / decrease OctaWordMapWrapper:

 class ScopedAtomicMapReader { public: explicit ScopedAtomicMapReader(std::atomic<OctaWordMapWrapper*>& map) : fMap(NULL) { do { fMap = map.load()->atomicIncrementAndGetPointer(); } while (NULL == fMap); } ~ScopedAtomicMapReader() { if (NULL != fMap) fMap->atomicDecrement(); } OctaWordMapWrapper* map(void) { return fMap; } private: OctaWordMapWrapper* fMap; }; // class ScopedAtomicMapReader 

With something similar, for example, your contains() and get() methods would be simplified (and protected against exceptions):

 bool contains(std::string &key) { ScopedAtomicMapReader mapWrapper(fReadMapReference); return (mapWrapper.map()->fStringMap->count(key) != 0); } std::string get(std::string &key) { ScopedAtomicMapReader mapWrapper(fReadMapReference); return mapWrapper.map()->fStringMap->at(key); // Now it fine if this throws... } 

Finally, although I don’t think you should do this, you can also try to declare fCounter as volatile (given that your access to it in the while loop in put() method is in a different thread than writing to it in read streams .

Hope this helps!

By the way, another minor thing: fReadMapReference is fReadMapReference . I think you should remove this in your destructor.

0
source share

All Articles