How to apply a parallel solution to the situation "Producer-consumer"

I have an XML file with a sequence of nodes. Each node represents an element that I need to parse and add to the sorted list (the order should be the same from the nodes found in the file).

I am currently using a sequential solution:

struct Graphic { bool parse() { // parsing... return parse_outcome; } }; vector<unique_ptr<Graphic>> graphics; void producer() { for (size_t i = 0; i < N_GRAPHICS; i++) { auto g = new Graphic(); if (g->parse()) graphics.emplace_back(g); else delete g; } } 

So, only if a graphic object (which is actually an instance of a class derived from Graphic , Line, Rectangle, etc., therefore new ) can be correctly analyzed, it will be added to my data structure.

Since I don’t care about the order in which these graphs are added to my list, I at least asynchronously called the parse method, so the producer has the task of reading each node from the file and adding this figure to the data structure, while The consumer has the task of analyzing each graphic when the new graphic is ready for analysis.

Now I have several consumer threads (mostly created), and my code looks like this:

 queue<pair<Graphic*, size_t>> q; mutex m; atomic<size_t> n_elements; void producer() { for (size_t i = 0; i < N_GRAPHICS; i++) { auto g = new Graphic(); graphics.emplace_back(g); q.emplace(make_pair(g, i)); } n_elements = graphics.size(); } void consumer() { pair<Graphic*, size_t> item; while (true) { { std::unique_lock<std::mutex> lk(m); if (n_elements == 0) return; n_elements--; item = q.front(); q.pop(); } if (!item.first->parse()) { // here I should remove the item from the vector assert(graphics[item.second].get() == item.first); delete item.first; graphics[item.second] = nullptr; } } } 

I start the producer first of all in my main, so that when the first consumer starts the queue, it is already completely full.

 int main() { producer(); vector<thread> threads; for (auto i = 0; i < N_THREADS; i++) threads.emplace_back(consumer); for (auto& t : threads) t.join(); return 0; } 

The matching version is apparently at least twice as fast as the original. Full code is downloaded here .

Now I am wondering:

  • Are there any errors (synchronization) in my code?
  • Is there a way to achieve the same result faster (or better)?

In addition, I noticed that on my computer I get a better result (in terms of elapsed time) if I set the number of threads to 8. More (or less) threads give me worse results. Why?

+5
source share
1 answer

Blockquote There are no synchronization errors, but I think memory management might be better, since your code leaked if parse() throws an exception.

There are no synchronization errors, but I think that managing your memory might be better, since you will have leaks if parse() throws an exception.

Blockquote Is there a way to achieve the same result faster (or better)?

Maybe. You can use a simple thread pool implementation and lambda that do parsing () for you.

The code below illustrates this approach. I am using threadpool implementation here

 #include <iostream> #include <stdexcept> #include <vector> #include <memory> #include <chrono> #include <utility> #include <cassert> #include <ThreadPool.h> using namespace std; using namespace std::chrono; #define N_GRAPHICS (1000*1000*1) #define N_THREADS 8 struct Graphic; using GPtr = std::unique_ptr<Graphic>; static vector<GPtr> graphics; struct Graphic { Graphic() : status(false) { } bool parse() { // waste time try { throw runtime_error(""); } catch (runtime_error) { } status = true; //return false; return true; } bool status; }; int main() { auto start = system_clock::now(); auto producer_unit = []()-> GPtr { std::unique_ptr<Graphic> g(new Graphic); if(!g->parse()){ g.reset(); // if g don't parse, return nullptr } return g; }; using ResultPool = std::vector<std::future<GPtr>>; ResultPool results; // ThreadPool pool(thread::hardware_concurrency()); ThreadPool pool(N_THREADS); for(int i = 0; i <N_GRAPHICS; ++i){ // Running async task results.emplace_back(pool.enqueue(producer_unit)); } for(auto &t : results){ auto value = t.get(); if(value){ graphics.emplace_back(std::move(value)); } } auto duration = duration_cast<milliseconds>(system_clock::now() - start); cout << "Elapsed: " << duration.count() << endl; for (size_t i = 0; i < graphics.size(); i++) { if (!graphics[i]->status) { cerr << "Assertion failed! (" << i << ")" << endl; break; } } cin.get(); return 0; } 

This is a little faster (1 s) on my machine, more readable and eliminates the need for shared data (sync evil, avoid it or hide it in a reliable and efficient way).

0
source

All Articles