How to avoid data race with `asio :: ip :: tcp :: iostream`?

My question

How to avoid data race when using two streams for sending and receiving by asio::ip::tcp::iostream?

Design

I am writing a program that uses asio::ip::tcp::iostreaminput and output. The program receives commands from the (remote) user port 5555 and sends messages over the same TCP connection with the user. Since these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmission and reception streams.

In this version of the toys, the teams are "one", "two" and "exit". Of course, quit exits the program. Other commands do nothing, and any unrecognized command forces the server to close the TCP connection.

Sent messages are simple, sequential number messages that are sent once per second.

Both in this version of the toys and in the real code that I am trying to write, the transmission and reception processes use I / O lock, so it is not possible to use a std::mutexdifferent synchronization mechanism. (In my attempt, one process would capture the mutex and then block that it would not work for this.)

Building and testing

To build and test this, I use gcc version 7.2.1 and valgrind 3.13 on a 64-bit Linux machine. Structure:

g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread

, :

valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent 

telnet 127.0.0.1 5555 . , helgrind , , runTx runRx :

== 16188 == 1 0x1FFEFFF1CC # 1

== 16188 == : none

... ,

concurrent.cpp

#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream *in, std::ostream *out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream *in);
    int runRx(std::ostream *out);
    bool want_quit;
    bool want_reset;
};

int Console::runTx(std::istream *in) {
    static const std::array<std::string, 3> cmds{
        "quit", "one", "two", 
    };
    std::string command;
    while (!want_quit && !want_reset && *in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream *out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        (*out) << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        out->flush();
    }
    return 0;
}

int Console::run(std::istream *in, std::ostream *out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, out};
    int status = runTx(in);
    t1.join();
    return status;
}

int main()
{
    Console con;
    asio::io_service ios;
    // IPv4 address, port 5555
    asio::ip::tcp::acceptor acceptor(ios, 
            asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
    while (!con.getQuitValue()) {
        asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());
        con.run(&stream, &stream);
        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}
+6
1

, , ,

Sidenote, , "", :

std::atomic_bool want_quit;
std::atomic_bool want_reset;

, , . : , , .

. , , (filedescriptor). , Asio.

Boost Iostreams:

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

// .... later:

    // HACK: procure a _separate `ostream` to prevent the race, using the same fd
    namespace bio = boost::iostreams;
    bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
    bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

    con.run(stream, hack_ostream);

, ( , - Asio ( s) ).

:

. . , , . .

, - . ( , - ..).

, stackful coroutines (http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/spawn.html)

. , . , . , bind/std::thread, std::ref, .

[ - .]

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>

class Console {
public:
    Console() :
        want_quit{false},
        want_reset{false}
    {}
    bool getQuitValue() const { return want_quit; }
    int run(std::istream &in, std::ostream &out);
    bool wantReset() const { return want_reset; }
private:
    int runTx(std::istream &in);
    int runRx(std::ostream &out);
    std::atomic_bool want_quit;
    std::atomic_bool want_reset;
};

int Console::runTx(std::istream &in) {
    static const std::array<std::string, 3> cmds{
        {"quit", "one", "two"}, 
    };
    std::string command;
    while (!want_quit && !want_reset && in >> command) {
        if (command == cmds.front()) {
            want_quit = true;
        }
        if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
            want_reset = true;
            std::cout << "unknown command [" << command << "]\n";
        } else {
            std::cout << command << '\n';
        }
    }
    return 0;
}

int Console::runRx(std::ostream &out) {
    for (int i=0; !(want_reset || want_quit); ++i) {
        out << "This is message number " << i << '\n';
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        out.flush();
    }
    return 0;
}

int Console::run(std::istream &in, std::ostream &out) {
    want_reset = false;
    std::thread t1{&Console::runRx, this, std::ref(out)};
    int status = runTx(in);
    t1.join();
    return status;
}

#define BOOST_IOSTREAMS_USE_DEPRECATED
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>

int main()
{
    Console con;
    boost::asio::io_service ios;

    // IPv4 address, port 5555
    boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});

    while (!con.getQuitValue()) {
        boost::asio::ip::tcp::iostream stream;
        acceptor.accept(*stream.rdbuf());

        {
            // HACK: procure a _separate `ostream` to prevent the race, using the same fd
            namespace bio = boost::iostreams;
            bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
            bio::stream<bio::file_descriptor_sink> hack_ostream(fds);

            con.run(stream, hack_ostream);
        }

        if (con.wantReset()) {
            std::cout << "resetting\n";
        }
    }
}

:

netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2

commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)

, ( "" ).

+1

All Articles