The complete Idea1 solution code, revised using the spin-locking function here (single queue)
The complete Idea2 solution code is here (bee queue)
Idea1 solution source code - race condition with several producers is not fixed again
Correction of the race condition with several producers in the original Idea1 solution (two caveats, but the second notification / wake of the bee before the first producer writes an element)
size_t i = w1++; // reserve the slot .... while(unlikely( // producer spin-lock !w2.compare_exchange_strong( &i, i+1))) // preserve reservation order this_thread::yield(); // pass CPU to other thread .... atomic<size_t> w1; // production slot reservation atomic<size_t> w2; // production signal synchronization
The new code passed the test with 5,000 produced events by 100 producers consumed by 20 bees (100,000 total). When test_point() empty, there were no pins (all were skipped without any loop on one core), the same result with yield , but 106 recorded spins with zero sleep (read the concept) and 14 spin notifications (after adding stuck=false where it belongs). Buzz me (by comments) if you want to explain the code.
Introduction
We can use semaphores + boost :: lockfree: queue (per bee - Idea2 ). This will not slow down too much (unlocking should be cheap, this queue should not be synchronized, we need to raise the semaphore after clicking on a separate queue and try to lock / accept before accessing the queue ). In fact, using mutexes for any lineup, if anyone has one for a bee, should be fine (time spent under lock and key should be short). I think that it would be possible to write both a time-efficient and a time-efficient queue for this scenario (without unnecessary blocking and reallocation of memory), but the question is: is it worth it?
Using futures ( Idea3 ) is not a good solution (mutex, condition_variable, once_flag and atomic_flag for the future), but (Idea1) vector with shared_mutex for growth and managing bees (shared_lock to access the queue, unique_lock when we need to grow it or spawn new bees) with per-bee-semaphore (to make a bee sleep on it) can work ( boost :: lockfree :: spsc_queue is actually written around a vector-like structure with one read pointer and one write pointer - we could use several reading pointers for the bee ). The solution offered the omits push_full implementation (when we need to increase the queue), but passed tests with 15 producers, 10 consumers / bees and 1000 Evens produced (10 * 1000 consumed) on the QuadCore processor.
Summary of proposed solutions:
- Pros: Easy to implement, best for heavy loads.
Cons: Delay, active waiting, many queues. - Pros: No active wait (event driven).
Cons: It is more difficult to implement, many queues. - Pros: Free growers (never blocking), easier bee management.
Cons: The person in the middle introduces some delay (but minimal). - Pros: Optimal memory usage. Central read-write lock (shared_mutex) for control.
Cons: Actually, it’s hard to implement (redistribution, in particular, several read pointers).
NOTE. This is not my initial answer, especially Opt2 was rewritten, and it was first designed to balance the load of a single queue. Rumburak's comments may refer to the original meaning (and sometimes to misunderstanding after the first review). Opt3 & 4 were added with the first review, the full code for Opt4 last.
boost :: lockfree: queue is an ideal candidate for this scenario ( multi-processor multi-user blocking without a queue ), however we must solve the problem of empty queue (s) (when consumers / bees have nothing in common) and the fact that all bees should receive everything events (missed the first time I wrote my answer). I take the second idea from the author for using a separate queue for each bee (click like this: for(bee& b : bees) b.push(e); )
Option 1: the minimum time between events is 0.2 seconds
We can use this information for sleep-if-no-work in bees:
while (!done) { if (time_to_do_regular()) do_regular(); if (queue.pop(value)) do_work(value); else this_thread::sleep_for(milliseconds(200)) }
We can shorten latency to reduce latency this gives. This solution seems great if we balance the timeout with the number of bees / consumers, but adds the overhead of using cpu unnecesary - this can be called an active expectation and can slow down the producers (or any other process), I prefer not to do this if we can avoid this with some technique of waiting for the event.
Option 2: Semaphore in each queue (can be implemented using mutex and condition_variable).
Regular work should be performed in the same way as in option 1, but instead of active waiting, we can use timed try_lock / wait_for / wait_until (with time_point for the next scheduled execution).
COMMENT There were two incomplete versions that both worked when used as a queue for one bee (multidisciplinary, one consumer). Both were originally designed for load balancing (and unsuccessfully in this scenario, unfortunately, Rumburak continues to talk about a single queue). This is the third version, which should be complete (but all of them were really correct for one consumer, and this approach with a negative counter could be used to adapt it for load balancing):
Full code here
using the_clock = chrono::steady_clock; using stamp = chrono::time_point<the_clock>; template<class E> class event_queue { public: event_queue(int nbees = 64) { bees = new handle[this->nbees = nbees]; for(int i = 0; i < nbees; i++) new(bees + i) handle(); } void push(E e) { for(int i = 0; i < nbees; i++) bees[i].push(e); } bool pop(int id, E& e, stamp until) { return bees[id].pop(e, until); } private: struct handle { atomic<int> n; // -1 if bee is sleeping boost::lockfree::queue<E> q; mutex mx; condition_variable cv; bool wakeup, was_empty; void push(E e) { q.push(e); if(++n == 0) { { lock_guard<mutex> lock(mx); wakeup = true; } cv.notify_one(); } } bool pop(E& e, stamp until) { if(the_clock::now() >= until) return false; if(was_empty || --n < 0) { was_empty = true; unique_lock<mutex> lock(mx); if(!cv.wait_until(lock, until, [this] { return wakeup; })) return false; wakeup = false; was_empty = false; } q.pop(e); return true; } } *bees; int nbees; };
3:
for (auto&& bee : bees) bee.push(event) ( event shared_ptr ), ( boost:: lockfree: queue - ), ( ) ( boost:: lockfree: spsc_queue - , ). , . - 2.
( ).
4: ( , )
Idea1 . ( ), , ( ). , , , , Idea2/Opt2.
Idea1, spin-locking ( )
Idea1 -
NOTE. notify_all() ( ).
: ( , pop() false )
template<class E> class event_queue { public: event_queue(size_t nbees = P, size_t bits = 10) { // minimal capacity: 256, default: 1024 // must be power of 2 (we rather use and-mask instead of modulo) if(bits <= 8) bits = 8; size_t size = 1 << bits; mask = size-1; buf = a.allocate(size); bees = b.allocate(this->nbees = nbees); for(int i = 0; i < nbees; i++) b.construct(bees + i, *this); } // EDIT: Updated with spin-lock after construction void push(const E& e) { shared lock(sync); // quick shared lock size_t i = w1++; // reserve the slot if(unlikely(i > lr+mask)) { // check full push_full(i, e); return; } // not implemented yet a.construct(buf+(i&mask), nbees, e); // emplace // EDIT: This synchronization was added while(unlikely( // producer spin-lock !w2.compare_exchange_strong( &i, i+1))) // preserve reservation order this_thread::yield(); // pass CPU to other thread // EDIT: SEE NEW CODE FOR BIG CHANGE HERE for(int i = 0; i < nbees; i++) bees[i].notify(); } bool pop(int id, E& e, stamp until) { return bees[id].pop(e, until); }
:
private: // global synchronization (fast-locking unless we need to grow or manage bees) shared_mutex sync; typedef shared_lock_guard<shared_mutex> shared; typedef lock_guard<shared_mutex> unique; // consumed counting struct item { atomic<size_t> c; // consumed countdown E e; // the event/element item(int nbees, const E& e) : c(nbees), e(e) {}}; // event buffer atomic<size_t> w1; // production slot reservation atomic<size_t> w2; // production signal synchronization atomic<size_t> lr; // lowest read index (for each bee: bee.ri >= lr) size_t mask; // element at buf[index&mask] item *buf; // the buffer int nbees; // number of bees struct handle { handle(event_queue<E>& q): n(0), ri(0), q(q), mx(), cv(), wakeup(false), was_empty(false) {} atomic<int> n; // number of elements available for consumation int ri; // read index event_queue<E>& q; // master queue mutex mx; condition_variable cv; bool wakeup, was_empty; void notify() { if(++n == 0) { { lock_guard<mutex> lock(mx); wakeup = true; } cv.notify_one(); }} bool pop(E& e, stamp until) { if(the_clock::now() >= until) return false; if(was_empty || --n < 0) { was_empty = true; unique_lock<mutex> lock(mx); if(!cv.wait_until(lock, until, [this] { return wakeup; })) return false; wakeup = false; was_empty = false; } // get the item shared lock(q.sync); item& i = q.buf[ri++ & q.mask]; e = ie; // desctroy and advance lowest read index if whe were last if(--ic == 0) { ++q.lr; qadestroy(&i); } return true; } } *bees; allocator<item> a; allocator<handle> b; void push_full(int i, const E& e) { // not implemented assert(false); // unique lock(sync); ...lot of work throw -1; } };