Allocator Ring for Lockfree Member Variable Update?

I have a class that stores the last value of some incoming data in real time (about 150 million events per second).

Suppose it looks like this:

class DataState { Event latest_event; public: //pushes event atomically void push_event(const Event __restrict__* e); //pulls event atomically Event pull_event(); }; 

I need to be able to atomize events and pull them out with strict guarantees of order. Now I know that I can use spin lock, but given the mass speed of the event (more than 100 million per second) and a high degree of concurrency, I would prefer to use operations without locking.

The problem is that Event is 64 bytes in size. There is no CMPXCHG64B instruction for any current X86 processor (as of August '16). Therefore, if I use std::atomic<Event> , I will need to reference libatomic , which uses mutexes under the hood (too slow).

So my solution was to instead atomically change the pointers to a value. The problem is that dynamic memory allocation becomes a bottleneck with such event rates. So ... I define what I call the "ring distributor":

 /// @brief Lockfree Static short-lived allocator used for a ringbuffer /// Elements are guaranteed to persist only for "size" calls to get_next() template<typename T> class RingAllocator { T *arena; std::atomic_size_t arena_idx; const std::size_t arena_size; public: /// @brief Creates a new RingAllocator /// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data RingAllocator<T>(std::size_t size) : arena_size(size) { //allocate pool arena = new T[size]; //zero out pool std::memset(arena, 0, sizeof(T) * size); arena_idx = 0; } ~RingAllocator() { delete[] arena; } /// @brief Return next element pointer. Thread-safe /// @return pointer to next available element T *get_next() { return &arena[arena_idx.exchange(arena_idx++ % arena_size)]; } }; 

Then I could have a DataState class as follows:

 class DataState { std::atomic<Event*> latest_event; RingAllocator<Event> event_allocator; public: //pushes event atomically void push_event(const Event __restrict__* e) { //store event Event *new_ptr = event_allocator.get_next() *new_ptr = *e; //swap event pointers latest_event.store(new_ptr, std::memory_order_release); } //pulls event atomically Event pull_event() { return *(latest_event.load(std::memory_order_acquire)); } }; 

As long as I define my ring allocator for the maximum number of threads that can call functions at the same time, there is no risk of overwriting the data that pull_event could return. In addition, all are super-localized, so indirect use will not lead to poor cache performance. Any possible pitfalls with this approach?

+6
source share
3 answers

DataState Class:

I thought it would be a stack or a queue, but it is not, so push / pull does not seem like good names for methods. (Or the implementation is completely fictitious).

This is just a latch that allows you to read the last event that stores any thread.

There is nothing to stop two records in a row from overwriting an element that has never been read. There is also nothing stopping you from reading the same element twice.

If you just need to copy small blocks of data, the ring buffer looks like a decent approach. But if you do not want to lose events, I do not think that you can use it that way. Instead, simply enter the ring buffer entry, then copy it and use it there. Thus, a single atom operation should increase the buffer position index of the buffer.


Ring buffer

You can make get_next() much more efficient. This line performs atomic post-increment (fetch_add) and atomic exchange:

 return &arena[arena_idx.exchange(arena_idx++ % arena_size)]; 

I'm not even sure if this is safe, because xchg can step on fetch_add from another thread. In any case, even if it is safe, it is not ideal.

You do not need it. Make sure that arena_size always has a capacity of 2, then you do not need to use a common counter. You can just let it go, and each thread is modulo for their own use. It will eventually be wrapped, but it's a binary integer so that it turns with a force of 2, which is a multiple of your arena size.

I would suggest saving the I-mask instead of size, so there is no risk of compiling % for anything other than the and statement, even if it is not a compile-time constant. This avoids the 64-bit integral div statement.

 template<typename T> class RingAllocator { T *arena; std::atomic_size_t arena_idx; const std::size_t size_mask; // maybe even make this a template parameter? public: RingAllocator<T>(std::size_t size) : arena_idx(0), size_mask(size-1) { // verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits. // so that i % size == i & size_mask for all i ... } ... T *get_next() { size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed); // still atomic, but we don't care which order different threads take blocks in idx &= size_mask; // modulo our local copy of the idx return &arena[idx]; } }; 

Arena highlighting would be more efficient if you used calloc instead of the new + memset. OS already loads pages before passing them to user space processes (to prevent information leakage), so writing them all is just wasted.

  arena = new T[size]; std::memset(arena, 0, sizeof(T) * size); // vs. arena = (T*)calloc(size, sizeof(T)); 

Writing the pages themselves causes them to be corrupted, so they are tied to real physical pages, not just write mappings for a system-wide common physical zero page (for example, after creating new / malloc / calloc). On a NUMA system, the selected physical page may depend on which thread actually touched the page, and not which thread made the selection. But since you are reusing the pool, the first core to write the page may not be the one that uses it the most.

Maybe something needs to be looked for in microbenchmarks / perf counters.

+2
source

While I define my ring allocator for the maximum number of threads that can call functions at the same time, there is no risk of overwriting the data that pull_event could return ..... Possible pitfalls with this approach?

Trap, IIUC that your statement is false.

If I have only 2 threads and 10 elements in the ring buffer, the first thread can call pull_event once and be "medium stretch", and then the second thread can call push 10 times, overwriting which thread 1 to pull.

Again, assuming I understand your code correctly.

Also, as mentioned above,

 return &arena[arena_idx.exchange(arena_idx++ % arena_size)]; 

that arena_idx++ inside the exchange for the same variable just looks wrong. And actually this is wrong. Two threads can increase it - ThreadA increases to 8, and threadB increases to 9, and then threadB exchanges it for 9, then threadA exchanges it for 8. whoops.

atomic (op1) @atomic (op2)! = atomic (op1 @ op2)

I am worried about what is still wrong in the code that is not shown. I do not mean that, as an insult, blocking is not easy.

+1
source

Have you looked at any available C ++ Disruptor (Java) ports?

disruptor--

disruptor

Although they are not complete ports, they can offer everything you need. I am currently working on a more fully functional port, however it is not quite ready.

0
source

All Articles