So, I'm trying to create a copy-to-write map that uses an attempt to count atomic links on the read side so as not to block.
Something is not quite right. I see that some links are overflowing and some are dropping negatively, so something is not atomic. In my tests, I have 10 read threads, the loop of which is 100 times, each of which does the get () thread and 1 writer, which executes 100 records.
He is stuck in the writer because some links never drop to zero, although they should.
I am trying to use the 128-bit DCAS methodology described in this blog post .
Is there something egregious in this regard, or is there an easier way to debug this rather than playing with it in the debugger?
typedef std::unordered_map<std::string, std::string> StringMap; static const int zero = 0; //provides an l-value for asm code class NonBlockingReadMapCAS { public: class OctaWordMapWrapper { public: StringMap* fStringMap; //std::atomic<int> fCounter; int64_t fCounter; OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0) { } OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0) { } ~OctaWordMapWrapper() { delete fStringMap; } /** * Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap * pointer and fCounter. */ static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) { bool cas_result; __asm__ __volatile__ ( "lock cmpxchg16b %0;" // cmpxchg16b sets ZF on success "setz %3;" // if ZF set, set cas_result to 1 : "+m" (*target), "+a" (compareMap), //compare target stringmap pointer to compareMap "+d" (compareCounter), //compare target counter to compareCounter "=q" (cas_result) //results : "b" (swapMap), //swap target stringmap pointer with swapMap "c" (swapCounter) //swap target counter with swapCounter : "cc", "memory" ); return cas_result; } OctaWordMapWrapper* atomicIncrementAndGetPointer() { if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1)) return this; else return NULL; } OctaWordMapWrapper* atomicDecrement() { while(true) { if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1)) break; } return this; } bool atomicSwapWhenNotReferenced(StringMap* newMap) { return doubleCAS(this, this->fStringMap, zero, newMap, 0); } } __attribute__((aligned(16))); std::atomic<OctaWordMapWrapper*> fReadMapReference; pthread_mutex_t fMutex; NonBlockingReadMapCAS() { fReadMapReference = new OctaWordMapWrapper(); } ~NonBlockingReadMapCAS() { delete fReadMapReference; } bool contains(const char* key) { std::string keyStr(key); return contains(keyStr); } bool contains(std::string &key) { OctaWordMapWrapper *map; do { map = fReadMapReference.load()->atomicIncrementAndGetPointer(); } while (!map); bool result = map->fStringMap->count(key) != 0; map->atomicDecrement(); return result; } std::string get(const char* key) { std::string keyStr(key); return get(keyStr); } std::string get(std::string &key) { OctaWordMapWrapper *map; do { map = fReadMapReference.load()->atomicIncrementAndGetPointer(); } while (!map); //std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n"; std::string value = map->fStringMap->at(key); map->atomicDecrement(); return value; } void put(const char* key, const char* value) { std::string keyStr(key); std::string valueStr(value); put(keyStr, valueStr); } void put(std::string &key, std::string &value) { pthread_mutex_lock(&fMutex); OctaWordMapWrapper *oldWrapper = fReadMapReference; OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper); std::pair<std::string, std::string> kvPair(key, value); newWrapper->fStringMap->insert(kvPair); fReadMapReference.store(newWrapper); std::cout << oldWrapper->fCounter << "\n"; while (oldWrapper->fCounter > 0); delete oldWrapper; pthread_mutex_unlock(&fMutex); } void clear() { pthread_mutex_lock(&fMutex); OctaWordMapWrapper *oldWrapper = fReadMapReference; OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper); fReadMapReference.store(newWrapper); while (oldWrapper->fCounter > 0); delete oldWrapper; pthread_mutex_unlock(&fMutex); } };