Increase ASIO and message passing between threads

I am working on creating a websocket server that receives a message and saves it to an embedded database . For reading messages I use boost asio . To save messages in the embedded database, I see several options in front of me:

  • Keep messages synchronous as soon as I receive them on the same stream.
  • Store messages asynchronously in a separate thread.

I am sure that the second answer is what I want. However, I am not sure how to transfer messages from the socket stream to the input / output stream. I see the following options:

  • Use one io service for each thread and use the post function to communicate between threads. Here I have to worry about the conflict. Should I?
  • Use Linux domain sockets to transfer messages between threads. As far as I understand, no disputes about blocking. Here I can probably use the BOOST_ASIO_DISABLE_THREADS macro to improve performance.

In addition, I believe that this will help to have several input / output streams that will receive messages in a cyclic mode for saving to the built-in database.

Which architecture will be most effective? Are there any other alternatives that I mentioned?

A few notes:

  • Messages are exactly 8 bytes long .
  • . .
  • RocksDB .
+4
5

, unix-, . , , , .

API , (, ), boost::asio::io_service . io_service::strand io_service strand::dispatch() ( io_service::post()) . strand , , io_service .

, io_service? , . , strand::dispatch() , (, strand ), .

, , , . , / - , , .

+2

/ , MPMC Queue Facebook 50%. non-blocking write, - . , .

SPSC cond boost . , . - , .

, ( UDP ) , . .

+1

, , io_service - , , io_service, boost::asio::io_service::run . 8- .

. , , . , , boost::lockfree::queue, , io_service , .

? . , , , . - : , , , , , .

0
void Consumer( lockfree::queue<uint64_t> &message_queue ) {
    // Connect to database...
    while (!Finished) {
        message_queue.consume_all( add_to_database ); // add_to_database is a Functor that takes a message
        cond_var.wait_for( ... ); // Use a timed wait to avoid missing a signal.  It OK to consume_all() even if there nothing in the queue.
    }
}

void Producer( lockfree::queue<uint64_t> &message_queue ) {
    while (!Finished) {
        uint64_t m = receive_from_network( );
        message_queue.push( m );
        cond_var.notify_all( );
    }
}
0

, cxx11 , std:: async, .

0

All Articles