Efficient C ++ 11 design for regular listener event listener?

Problem

I have a system (using C ++ 11) with a producer of irregular events: P (for example, it can be user interface events, or they can receive financial trade data through a TCP / IP socket, etc.) Each event is delivered with a small data packet.

Then I have the number of "working bees": B1 , B2 , B3 , ... Each of them performs its own processing of events that P feeds them. Processing them can be quick, but can take a lot of time, so the plan is to run each working bee in its own thread. In addition, each of the working bees must run a different function every N seconds (for example, N = 60, but again, it can vary depending on the working bee). This regular processing should always be done sequentially with event processing, never in another thread.

Finally, some worker bees may also receive events from other producers ( P2 , P3 , etc.). However, if this complicates things, I can always have P1 , P2 , etc. central P , whose task is to send all events to working bees.

Question

What is the best design for such a system? Low latency and efficiency are the main criteria for the best. Reliability is also important, though: every B must receive every event (even if they pass through the party because he was busy at the time), and if one B fails, it should not affect the others.

If it matters: suppose there are 1-64 working bees, 4-8 hardware threads, the average time between events is 10 seconds, the minimum time between events is 0.2 seconds, and the usual regular function is every N=60 seconds. But if the perfect design is sensitive to any of these criteria, I would like to understand how to do it.

NOTE. If working bees can guarantee never to throw an exception, will this change the choice of the best design? (It looks like it will be unconnected, but I thought I would pick it up.)

NOTE: there may be more bees than hardware threads; Suppose this is a problem at another time. (For example, delay may be relevant for some worker bees and may be provided with their own threads, while others may be told to share the stream.)

Idea 1. Wait for an event or timeout

Each P has a mutex and condition. When it receives new data, it signals a status.

Each worker bee uses theCondition.wait_until(lock,timeout) , where timeout is the time it needs to wake up to complete its normal processing. It checks the return value to see if it has been signaled or timed out.

The disadvantage here is that it is just a signal, no data. Therefore, I will need every B to get another lock for read access to the data queue. And usually they all want to do it at the same time, so it gets ugly.

I also don’t understand what will happen if one B processes something for a long time and skips several events before it starts calling wait_until .

Idea 2: data queue for each employee

Here, each B has a queue with a lock. P gets a write lock and adds a data item. B gets a read lock to pull out each item when it's ready. I still need a way to know B to wake up, because there is new data.

The downside here seems to be thread P , which every B has to scroll to provide them with data. This introduces latency, and also feels fragile (for example, if one of the working bees behaves badly).

Idea Three: Futures

This problem seems good for future futures. P creates std::promise , then each B gets std::future (a std::shared_future , I assume). Since P receives a new event, it calls set_value() in the promise. Each B calls wait_until in its future.

This is a call when the signal and data are combined. There is also no blockage, so it must be resilient.

The bit I'm stuck with is that a promise / future is a one-punched weapon. I need to create a new set of dinning / shared _future pairs immediately after each new event. How could this work? (Can I pass the following shared_future as part of the data sent by the set_value call?) Is there any chance that any event will skip any of the workers if two events pass in quick succession?

+7
multithreading events c ++ 11 latency future
source share
7 answers

It looks like you can use the producer-consumer form. Here is an example of this, using boost libraries and locking (from boost), just change the type it works on:

 boost::atomic_int producer_count(0); boost::atomic_int consumer_count(0); boost::lockfree::queue<int> queue(128); const int iterations = 10000000; const int producer_thread_count = 4; const int consumer_thread_count = 4; void producer(void) { for (int i = 0; i != iterations; ++i) { int value = ++producer_count; while (!queue.push(value)) ; } } boost::atomic<bool> done (false); void consumer(void) { int value; while (!done) { while (queue.pop(value)) ++consumer_count; } while (queue.pop(value)) ++consumer_count; } int main(int argc, char* argv[]) { using namespace std; cout << "boost::lockfree::queue is "; if (!queue.is_lock_free()) cout << "not "; cout << "lockfree" << endl; boost::thread_group producer_threads, consumer_threads; for (int i = 0; i != producer_thread_count; ++i) producer_threads.create_thread(producer); for (int i = 0; i != consumer_thread_count; ++i) consumer_threads.create_thread(consumer); producer_threads.join_all(); done = true; consumer_threads.join_all(); cout << "produced " << producer_count << " objects." << endl; cout << "consumed " << consumer_count << " objects." << endl; } 
+3
source share

I will start with some criticism of the options so far and propose another one below:

Your idea:

My main problem is that you are talking about a single data queue. Since each bee needs to process each event, you need to somehow track how many bees saw each event in order to be able to permanently drop data from the queue.

Of course, you can associate each element in the queue with an atom countdown variable. A bee that sets the counter to zero also removes the event from the queue. But atomic counters are not free of cost. And you should be / really / careful how to decrease the counter and check zero so that you are sure that exactly one Bee removes the event from the queue, see below the potential race condition.

And you cannot really use the usual queue, because if one Bee is behind, events are not deleted, and other bees cannot start work on new incoming events, if they somehow do not know how many items to skip. And how can they know that? They would need to keep some sort of pointer or index of the last processed event.

And you cannot add or remove bees at run time, as this can ruin the counter logic to remove events from the queue.

And the Bees just shouldn't crash, because that would ruin the counter logic again.

Note. It would be different: by the way, if your program does not work for a very long time, and you can afford to store events until the end of the program’s life in a huge vector, for example. Then each bee had to just keep the index of the last processed event. In the following case, I will assume that you need to delete event data after processing it.

Your second idea:

Of course, there is overhead for copying each event in each queue, but you wrote that the events are small. Compared to single-line problems, this seems like a small price.

Regarding the locks you mentioned, out-of-queue locking allows you to get rid of the ones mentioned by wbennett and firda.

You still need to somehow resolve the issue of how to manage the challenge of a normal heartbeat.

Your idea is three:

IMO, Promise / Future do not help here at all, because you should still be able to somehow queue them if the Bee is behind. So, you are back to the queues, but this time with more complex structures that come with their extra overhead.

firda Option one:

My main objection is the use of one queue with all the shortcomings described above.

In addition, to reduce latency to a minimum, you need to reduce sleep time to a minimum. So basically you will have a lively wait. Of course, different sleep times / can / spread, load, but there is no guarantee for this.

firda Option Two:

Sorry if I misinterpreted the earlier versions.

Honestly, I can’t say if the current version is correct (posted at http://pastebin.com/Y7M56Mtm ). The reason is that a huge number of variables are used to decide whether to sleep, pop, or do normal work.

In addition to the queue (with its own mutex), a condition variable, a mutex, and some time information, you have

  • n - basically the counter of the elements in the queue
  • was_empty - an indicator that the manufacturer has detected an empty queue when clicked
  • awakening is an indicator that a producer wants to wake up with a bee

IMHO, none of these additional variables are required. The queue size must be sufficient. This synchronization code has a complexity that I would not want to consider. If you don’t have crazy multithreading skills (I certainly don’t do this), I would strongly advise you to give up trying to be smart about it.

So maybe that’s right, maybe not, I don’t know :-)

Wbennett suggestion:

As you can probably tell from the above material, my preference will be one turn for the bee. I don't think lockfree offers many advantages, though, see below.

However, he does not mention how to solve the usual method call.

My suggestion:

I posted my suggestion in the second answer because there were so many changes and comments in the reviews in this text, and things that were getting out of date all the time were discussed.

+3
source share

With additional information from Darren's commentary (memory is not a concern), we don’t need one queue for the bee. Instead, we can put events in a single vector or (if you are sure that the Bees cannot keep up with too much), a fixed-size array. And then

It looks like this:

 #include <array> #include <atomic> #include <mutex> #include <condition_variable> #include <chrono> struct event{}; constexpr size_t size = 65536; std::array<event, size> queue; std::mutex m; std::atomic<size_t> queue_index; std::condition_variable cv; struct bee { using clock = std::chrono::steady_clock; using timestamp = std::chrono::time_point<clock>; int _heartbeat_seconds; timestamp _until; size_t _index; bee(int heartbeat_seconds): _heartbeat_seconds(heartbeat_seconds), _until(clock::now() + std::chrono::seconds(_heartbeat_seconds)), _index(0) {} void run() { while(true) { // Wait until either an event is available or the regular call is due { std::unique_lock<std::mutex> lock(m); while(_index != queue_index and clock::now() >= _until) { cv.wait_until(lock, _until); } } // Perform regular call and process events until your out of work if (clock::now() >= _until) regular(); while (_index != queue_index) { ++_index; _index = _index % size; const auto& e = queue.at(_index); // do something with e if (clock::now() >= _until) regular(); } } } private: void regular() { // do something _until += std::chrono::seconds(_heartbeat_seconds); } }; struct producer { void run() { while(true) { // do something to get/produce an event event e; // Add the event and notify waiting bees std::lock_guard<std::mutex> lock(m); ++queue_index; queue_index = queue_index % size; queue.at(queue_index) = e; cv.notify_all(); } } }; int main() { // create producers and bees in their individual threads and let them run free } 

If you place calls on regular (), then it exactly depends on what you want to do if there is an event available and a regular call is required. In the above code, normal priority takes precedence.

Note: if there is a risk that the bee will be too far behind, you can increase the size of the array, of course, but with 0.2 seconds per event and 65536 events in the array, such a bee will be about three and a half behind. One way or another, you would be in trouble. Depending on your use case, you might want to add validation by comparing the indices.

+3
source share

The complete Idea1 solution code, revised using the spin-locking function here (single queue)
The complete Idea2 solution code is here (bee queue)
Idea1 solution source code - race condition with several producers is not fixed again


Correction of the race condition with several producers in the original Idea1 solution (two caveats, but the second notification / wake of the bee before the first producer writes an element)

  size_t i = w1++; // reserve the slot .... while(unlikely( // producer spin-lock !w2.compare_exchange_strong( &i, i+1))) // preserve reservation order this_thread::yield(); // pass CPU to other thread .... atomic<size_t> w1; // production slot reservation atomic<size_t> w2; // production signal synchronization 

The new code passed the test with 5,000 produced events by 100 producers consumed by 20 bees (100,000 total). When test_point() empty, there were no pins (all were skipped without any loop on one core), the same result with yield , but 106 recorded spins with zero sleep (read the concept) and 14 spin notifications (after adding stuck=false where it belongs). Buzz me (by comments) if you want to explain the code.


Introduction

We can use semaphores + boost :: lockfree: queue (per bee - Idea2 ). This will not slow down too much (unlocking should be cheap, this queue should not be synchronized, we need to raise the semaphore after clicking on a separate queue and try to lock / accept before accessing the queue ). In fact, using mutexes for any lineup, if anyone has one for a bee, should be fine (time spent under lock and key should be short). I think that it would be possible to write both a time-efficient and a time-efficient queue for this scenario (without unnecessary blocking and reallocation of memory), but the question is: is it worth it?

Using futures ( Idea3 ) is not a good solution (mutex, condition_variable, once_flag and atomic_flag for the future), but (Idea1) vector with shared_mutex for growth and managing bees (shared_lock to access the queue, unique_lock when we need to grow it or spawn new bees) with per-bee-semaphore (to make a bee sleep on it) can work ( boost :: lockfree :: spsc_queue is actually written around a vector-like structure with one read pointer and one write pointer - we could use several reading pointers for the bee ). The solution offered the omits push_full implementation (when we need to increase the queue), but passed tests with 15 producers, 10 consumers / bees and 1000 Evens produced (10 * 1000 consumed) on the QuadCore processor.


Summary of proposed solutions:

  • Pros: Easy to implement, best for heavy loads.
    Cons: Delay, active waiting, many queues.
  • Pros: No active wait (event driven).
    Cons: It is more difficult to implement, many queues.
  • Pros: Free growers (never blocking), easier bee management.
    Cons: The person in the middle introduces some delay (but minimal).
  • Pros: Optimal memory usage. Central read-write lock (shared_mutex) for control.
    Cons: Actually, it’s hard to implement (redistribution, in particular, several read pointers).

NOTE. This is not my initial answer, especially Opt2 was rewritten, and it was first designed to balance the load of a single queue. Rumburak's comments may refer to the original meaning (and sometimes to misunderstanding after the first review). Opt3 & 4 were added with the first review, the full code for Opt4 last.


boost :: lockfree: queue is an ideal candidate for this scenario ( multi-processor multi-user blocking without a queue ), however we must solve the problem of empty queue (s) (when consumers / bees have nothing in common) and the fact that all bees should receive everything events (missed the first time I wrote my answer). I take the second idea from the author for using a separate queue for each bee (click like this: for(bee& b : bees) b.push(e); )

Option 1: the minimum time between events is 0.2 seconds

We can use this information for sleep-if-no-work in bees:

 while (!done) { if (time_to_do_regular()) do_regular(); if (queue.pop(value)) do_work(value); else this_thread::sleep_for(milliseconds(200)) } 

We can shorten latency to reduce latency this gives. This solution seems great if we balance the timeout with the number of bees / consumers, but adds the overhead of using cpu unnecesary - this can be called an active expectation and can slow down the producers (or any other process), I prefer not to do this if we can avoid this with some technique of waiting for the event.

Option 2: Semaphore in each queue (can be implemented using mutex and condition_variable).

Regular work should be performed in the same way as in option 1, but instead of active waiting, we can use timed try_lock / wait_for / wait_until (with time_point for the next scheduled execution).

COMMENT There were two incomplete versions that both worked when used as a queue for one bee (multidisciplinary, one consumer). Both were originally designed for load balancing (and unsuccessfully in this scenario, unfortunately, Rumburak continues to talk about a single queue). This is the third version, which should be complete (but all of them were really correct for one consumer, and this approach with a negative counter could be used to adapt it for load balancing):

Full code here

 using the_clock = chrono::steady_clock; using stamp = chrono::time_point<the_clock>; template<class E> class event_queue { public: event_queue(int nbees = 64) { bees = new handle[this->nbees = nbees]; for(int i = 0; i < nbees; i++) new(bees + i) handle(); } void push(E e) { for(int i = 0; i < nbees; i++) bees[i].push(e); } bool pop(int id, E& e, stamp until) { return bees[id].pop(e, until); } private: struct handle { atomic<int> n; // -1 if bee is sleeping boost::lockfree::queue<E> q; mutex mx; condition_variable cv; bool wakeup, was_empty; void push(E e) { q.push(e); if(++n == 0) { { lock_guard<mutex> lock(mx); wakeup = true; } cv.notify_one(); } } bool pop(E& e, stamp until) { if(the_clock::now() >= until) return false; if(was_empty || --n < 0) { was_empty = true; unique_lock<mutex> lock(mx); if(!cv.wait_until(lock, until, [this] { return wakeup; })) return false; wakeup = false; was_empty = false; } q.pop(e); return true; } } *bees; int nbees; }; 

3:

for (auto&& bee : bees) bee.push(event) ( event shared_ptr ), ( boost:: lockfree: queue - ), ( ) ( boost:: lockfree: spsc_queue - , ). , . - 2.

( ).

4: ( , )

Idea1 . ( ), , ( ). , , , , Idea2/Opt2.

Idea1, spin-locking ( )
Idea1 -
NOTE. notify_all() ( ).

: ( , pop() false )

 template<class E> class event_queue { public: event_queue(size_t nbees = P, size_t bits = 10) { // minimal capacity: 256, default: 1024 // must be power of 2 (we rather use and-mask instead of modulo) if(bits <= 8) bits = 8; size_t size = 1 << bits; mask = size-1; buf = a.allocate(size); bees = b.allocate(this->nbees = nbees); for(int i = 0; i < nbees; i++) b.construct(bees + i, *this); } // EDIT: Updated with spin-lock after construction void push(const E& e) { shared lock(sync); // quick shared lock size_t i = w1++; // reserve the slot if(unlikely(i > lr+mask)) { // check full push_full(i, e); return; } // not implemented yet a.construct(buf+(i&mask), nbees, e); // emplace // EDIT: This synchronization was added while(unlikely( // producer spin-lock !w2.compare_exchange_strong( &i, i+1))) // preserve reservation order this_thread::yield(); // pass CPU to other thread // EDIT: SEE NEW CODE FOR BIG CHANGE HERE for(int i = 0; i < nbees; i++) bees[i].notify(); } bool pop(int id, E& e, stamp until) { return bees[id].pop(e, until); } 

:

 private: // global synchronization (fast-locking unless we need to grow or manage bees) shared_mutex sync; typedef shared_lock_guard<shared_mutex> shared; typedef lock_guard<shared_mutex> unique; // consumed counting struct item { atomic<size_t> c; // consumed countdown E e; // the event/element item(int nbees, const E& e) : c(nbees), e(e) {}}; // event buffer atomic<size_t> w1; // production slot reservation atomic<size_t> w2; // production signal synchronization atomic<size_t> lr; // lowest read index (for each bee: bee.ri >= lr) size_t mask; // element at buf[index&mask] item *buf; // the buffer int nbees; // number of bees struct handle { handle(event_queue<E>& q): n(0), ri(0), q(q), mx(), cv(), wakeup(false), was_empty(false) {} atomic<int> n; // number of elements available for consumation int ri; // read index event_queue<E>& q; // master queue mutex mx; condition_variable cv; bool wakeup, was_empty; void notify() { if(++n == 0) { { lock_guard<mutex> lock(mx); wakeup = true; } cv.notify_one(); }} bool pop(E& e, stamp until) { if(the_clock::now() >= until) return false; if(was_empty || --n < 0) { was_empty = true; unique_lock<mutex> lock(mx); if(!cv.wait_until(lock, until, [this] { return wakeup; })) return false; wakeup = false; was_empty = false; } // get the item shared lock(q.sync); item& i = q.buf[ri++ & q.mask]; e = ie; // desctroy and advance lowest read index if whe were last if(--ic == 0) { ++q.lr; qadestroy(&i); } return true; } } *bees; allocator<item> a; allocator<handle> b; void push_full(int i, const E& e) { // not implemented assert(false); // unique lock(sync); ...lot of work throw -1; } }; 
+1
source share

, . - . ( ), ( 2, ). ( push, ). ( ), , , . , , - ( , ). , .

 #include <vector> #include <mutex> #include <condition_variable> #include <thread> #include <chrono> using namespace std; using the_clock = chrono::steady_clock; using stamp = chrono::time_point<the_clock>; using ms = chrono::milliseconds; using seconds = chrono::seconds; struct event { // the event int id; // some data event(const event&) = default; event(): id(-1) {} event(int id): id(id) {} }; const size_t size = 1024; // capacity of the queue (must be 1<<x) event queue[size]; // buffer for the queue size_t produced; // number of produced events const size_t mask = size-1; // mask for converting counters to index mutex mx; // master lock for everything condition_variable cv; // mainly for notify_all() size_t production_watchdog; // decrease this at constant frequency size_t production_limit; // we can set this on the fly void push(const event& e) { // push the event unique_lock<mutex> lock(mx); // protected by global mutex queue[produced++ & mask] = e; // write the element and count it if(production_limit // check production limit && ++production_watchdog > production_limit) throw runtime_error("production too high"); lock.unlock(); cv.notify_all(); // wake any sleeping bee } class abee { // abstract bee (or interface to queue) friend void watchdog(); // watchdog can access internals ms timeout; // timeout for regular function stamp next; // next scheduled regular stamp heartbeat; // heartbeat for watchdog size_t consumed; // number of consumed events public: virtual ~abee() {} // virtual destructor (kill the thread here) virtual void regular() = 0; // regular function virtual void consume(const event&) = 0; // consume event abee(): timeout(60000), next(the_clock::now() + timeout), heartbeat(the_clock::now()), consumed(0) {} abee(ms timeout): timeout(timeout), next(the_clock::now() + timeout), heartbeat(the_clock::now()), consumed(0) {} void run() { for(;;) { // do regular job if the time is up if (the_clock::now() >= next) { heartbeat = next; // signal that we are alive regular(); // do regular next += timeout; } // and schedule next unique_lock<mutex> lock(mx); if(!cv.wait_until(lock, next, [this] { // 2 complement arithmetic is our friend size_t avail = produced-consumed; // if we are lagging, commit suicide if(avail >= size) throw runtime_error("bee lagging"); // check if we have at least one to consume return avail > 0; })) // nothing to consume continue; // we better make a copy event e = queue[consumed++ & mask]; // now we can unlock the queue lock.unlock(); // and consume the event consume(e); } } }; const int maxbees = 64; // maximum of living bees abee* bees[maxbees]; // list of active bees (can contain nullptrs) void watchdog() { // this will observe bees and kill the lagging seconds kill_after(90); // maximal time we let zombie bee there seconds sleep_timeout(10); // timeout before we perform our checks int production_decrement = 100; // should do for event per 0.1s for(;;) { this_thread::sleep_for(sleep_timeout); lock_guard<mutex> lock(mx); stamp dead = the_clock::now() - kill_after; for(int i = 0; i < maxbees; i++) { abee* bee = bees[i]; if(!bee) continue; // deleted bee if(bee->heartbeat < dead) { // kill and delete // place elsewhere if you don't want to delete it now ;) delete bee; bees[i] = nullptr; // we can spawn new here } } // keep production watchdog counter down if(production_limit && production_watchdog > production_decrement) production_watchdog -= production_decrement; else production_watchdog = 0; } } 

EDIT: delete/~abee() . .

+1
source share

, (.. 3 ). , , (IMHO) .

, :

 int main(int,char**){ FutureType F = startProducer(900,1100); std::thread c1 = startConsumer(F, [](const Message &result){ //process a message }, [](){ //regular processing }); std::thread c2 = startConsumer(F, [](const Message &result){ //process a message }, [](){ //regular processing }); c1.join();c2.join(); } 

, , : , , . ( , , .)

sleep : 900,1100 0,9 1,1 . ( , .)

F , . Message , :

 class Message{ public: std::shared_future<Message> next; int a; std::string b; }; typedef std::promise<Message> PromiseType; typedef std::shared_future<Message> FutureType; 

a b , next - . shared_future<T> , .

​​ startProducer ( ):

 FutureType startProducer(){ PromiseType *P = new PromiseType; FutureType F = P -> get_future().share(); std::thread producer([](PromiseType *D){ int cnt = 0; while(1){ //Wait for data to arrive and fill in msg Message msg; msg.a = ...; msg.b = ...; //Prepare for next message PromiseType *P2 = new PromiseType; msg.next = P2->get_future().share(); //Broadcast the message P->set_value( std::move(msg) ); //Tidy-up, ready for next iteration delete P; P = P2; } }, P); producer.detach(); //Run forever return F; } 

: ( set_value ) std::promise std::shared_future . std::promise , .

. , " " :

 std::thread startConsumer(FutureType F, std::function<void(const Message&)> processing){ std::thread consumer([=](FutureType F){ while(true){ Message msg = F.get(); processing(msg); F = std::move( msg.next ); } }, std::move(F)); return consumer; } 

F.get() . . msg.next , . -, ( ).

, 15 :

 int regularProcessingDeltaInSeconds = 15; 

.

startConsumer() :

 std::thread startConsumer(FutureType F, std::function<void(const Message&)> processing, std::function<void()> regularProcessing ){ std::thread consumer([=](FutureType F){ time_t t2 = ((int)( time(NULL) /regularProcessingDeltaInSeconds)+1)*regularProcessingDeltaInSeconds; //Round up to next interval auto nextMinute = std::chrono::system_clock::from_time_t(t2); while(true){ auto waitResult = F.wait_until(nextMinute); if(waitResult == std::future_status::timeout){ regularProcessing(); nextMinute += std::chrono::seconds(regularProcessingDeltaInSeconds); continue; } else if(waitResult != std::future_status::ready){ //No time-out, //but cannot call get() yet either. (Implies a spurious wake-up?) continue; } if(std::chrono::system_clock::now() > nextMinute){ //If we've got data from producer, but it is also time to do regular processing, // give the regular processing priority. regularProcessing(); nextMinute += std::chrono::seconds(regularProcessingDeltaInSeconds); } Message result = F.get(); processing(result); F = std::move( result.next ); } }, std::move(F)); return consumer; } 

( , std::chrono .)

get() wait_until() regularProcessing() , .

DRY , regularProcessing() , nextMinute . , , , wait_until() ready timeout , , regularProcessing() . get() .

(, , .)

That's all. , .

, , : , . , , .


. , "c3-erratic" , , .

 Producer: next data will arrive in 900ms... Producer: new data arrived, so about to set result (cnt=1) Producer: next data will arrive in 926ms... c1-fast:result.a=1,b=2014-08-16 12:21:28 (took 0ms to calculate) Producer: new data arrived, so about to set result (cnt=2) Producer: next data will arrive in 1051ms... c1-fast:result.a=2,b=2014-08-16 12:21:29 (took 0ms to calculate) c2-slow:result.a=1,b=2014-08-16 12:21:28 (took 944ms to calculate) c3-erratic:result.a=1,b=2014-08-16 12:21:28 (took 1116ms to calculate) c1-fast:PER-MINUTE PROCESSING, will take 491ms... c1-fast:per-minute processing, DONE! c2-slow:result.a=2,b=2014-08-16 12:21:29 (took 979ms to calculate) c2-slow:PER-MINUTE PROCESSING, will take 553ms... Producer: new data arrived, so about to set result (cnt=3) Producer: next data will arrive in 992ms... c1-fast:result.a=3,b=2014-08-16 12:21:30 (took 0ms to calculate) c2-slow:per-minute processing, DONE! Producer: new data arrived, so about to set result (cnt=4) Producer: next data will arrive in 1007ms... c1-fast:result.a=4,b=2014-08-16 12:21:31 (took 0ms to calculate) c3-erratic:result.a=2,b=2014-08-16 12:21:29 (took 2098ms to calculate) c3-erratic:PER-MINUTE PROCESSING, will take 452ms... c2-slow:result.a=3,b=2014-08-16 12:21:30 (took 943ms to calculate) c3-erratic:per-minute processing, DONE! Producer: new data arrived, so about to set result (cnt=5) Producer: next data will arrive in 944ms... c1-fast:result.a=5,b=2014-08-16 12:21:32 (took 0ms to calculate) 
+1
source share

, ,
, ;)


( ) ,
( , Cygwin 32bit XP, QuadCore):

 single 8 thrs 64 thrs 256 th.
lockfree spsc: 3.984s N/AN/AN/A
lockfree multi: 7.250s 4.672s 6.281s 8.703s
mutex+deque: 6.531s 3.797s 5.141s 7.859s
mutex+circular: 6.516s 3.969s 5.125s 7.984s
mutex+space_cb: 6.938s 4.234s 5.531s 8.109s
spin+deque: 4.109s 2.000s 3.250s 6.500s
spin+circular: 4.219s 2.172s 3.344s 6.703s
spin+space_cb: 4.609s 2.422s 3.672s 6.750s

, boost:: lockfree:: queue ( - ), boost:: lockfree:: spsc_queue ( SPSC scenairo, Idea1 Idea2 ). , , , . ( boost:: timer:: cpu_timer ), .

Spinlock (. ) , ;)

: ( W = , T otal = U ser + S ystem = )

 simple push many, pop many:
  boost::lockfree::spsc_queue:
    W: 4.050s, T: 3.984s (U: 3.984s, S: 0.000s, 98.4%)
  boost::lockfree::queue (preallocated):
    W: 7.400s, T: 7.250s (U: 7.156s, S: 0.094s, 98.0%)
  std::deque + std::mutex:
    W: 6.661s, T: 6.531s (U: 6.531s, S: 0.000s, 98.1%)
  boost::circular_buffer + std::mutex:
    W: 6.622s, T: 6.516s (U: 6.516s, S: 0.000s, 98.4%)
  boost::circular_buffer_space_optimized + std::mutex:
    W: 7.026s, T: 6.938s (U: 6.938s, S: 0.000s, 98.7%)
  std::deque + spinlock with yield:
    W: 4.175s, T: 4.109s (U: 4.109s, S: 0.000s, 98.4%)
  boost::circular_buffer + spinlock with yield:
    W: 4.328s, T: 4.219s (U: 4.219s, S: 0.000s, 97.5%)
  boost::circular_buffer_space_optimized + spinlock with yield:
    W: 4.674s, T: 4.609s (U: 4.609s, S: 0.000s, 98.6%)
8 threads push many, pop many:
  boost::lockfree::queue (preallocated):
    W: 4.778s, T: 4.672s (U: 4.250s, S: 0.422s, 97.8%)
  std::deque + std::mutex:
    W: 3.900s, T: 3.797s (U: 3.562s, S: 0.234s, 97.4%)
  boost::circular_buffer + std::mutex:
    W: 4.067s, T: 3.969s (U: 3.734s, S: 0.234s, 97.6%)
  boost::circular_buffer_space_optimized + std::mutex:
    W: 4.364s, T: 4.234s (U: 4.062s, S: 0.172s, 97.0%)
  std::deque + spinlock with yield:
    W: 2.040s, T: 2.000s (U: 1.766s, S: 0.234s, 98.0%)
  boost::circular_buffer + spinlock with yield:
    W: 2.318s, T: 2.172s (U: 1.859s, S: 0.312s, 93.7%)
  boost::circular_buffer_space_optimized + spinlock with yield:
    W: 2.463s, T: 2.422s (U: 2.156s, S: 0.266s, 98.3%)
64 threads push many, pop many:
  boost::lockfree::queue (preallocated):
    W: 6.779s, T: 6.281s (U: 3.953s, S: 2.328s, 92.7%)
  std::deque + std::mutex:
    W: 5.425s, T: 5.141s (U: 3.406s, S: 1.734s, 94.8%)
  boost::circular_buffer + std::mutex:
    W: 5.490s, T: 5.125s (U: 3.422s, S: 1.703s, 93.4%)
  boost::circular_buffer_space_optimized + std::mutex:
    W: 5.860s, T: 5.531s (U: 3.797s, S: 1.734s, 94.4%)
  std::deque + spinlock with yield:
    W: 3.678s, T: 3.250s (U: 1.656s, S: 1.594s, 88.4%)
  boost::circular_buffer + spinlock with yield:
    W: 3.720s, T: 3.344s (U: 1.500s, S: 1.844s, 89.9%)
  boost::circular_buffer_space_optimized + spinlock with yield:
    W: 3.951s, T: 3.672s (U: 2.000s, S: 1.672s, 92.9%)
256 threads push many, pop many:
  boost::lockfree::queue (preallocated):
    W: 9.648s, T: 8.703s (U: 3.688s, S: 5.016s, 90.2%)
  std::deque + std::mutex:
    W: 8.733s, T: 7.859s (U: 3.250s, S: 4.609s, 90.0%)
  boost::circular_buffer + std::mutex:
    W: 8.718s, T: 7.984s (U: 2.922s, S: 5.062s, 91.6%)
  boost::circular_buffer_space_optimized + std::mutex:
    W: 9.056s, T: 8.109s (U: 3.312s, S: 4.797s, 89.5%)
  std::deque + spinlock with yield:
    W: 7.395s, T: 6.500s (U: 1.844s, S: 4.656s, 87.9%)
  boost::circular_buffer + spinlock with yield:
    W: 7.482s, T: 6.703s (U: 1.906s, S: 4.797s, 89.6%)
  boost::circular_buffer_space_optimized + spinlock with yield:
    W: 7.806s, T: 6.750s (U: 1.969s, S: 4.781s, 86.5%)

NOTE. .I posted a warning in my original answer to the race condition on Idea1 with several manufacturers . It can be avoided by using a spin-like approach (second pointer to write - see Updated answer). Such an optimistic decision may be ahead of those who have heavy synchronization, but they are not easy to write. Now I am doing my own research for my own reasons (my Linux server) and will try to publish the relevant results here. Feel free to stop me if this goes out of topic;)

0
source share

All Articles