My question
How to avoid data race when using two streams for sending and receiving by asio::ip::tcp::iostream?
Design
I am writing a program that uses asio::ip::tcp::iostreaminput and output. The program receives commands from the (remote) user port 5555 and sends messages over the same TCP connection with the user. Since these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmission and reception streams.
In this version of the toys, the teams are "one", "two" and "exit". Of course, quit exits the program. Other commands do nothing, and any unrecognized command forces the server to close the TCP connection.
Sent messages are simple, sequential number messages that are sent once per second.
Both in this version of the toys and in the real code that I am trying to write, the transmission and reception processes use I / O lock, so it is not possible to use a std::mutexdifferent synchronization mechanism. (In my attempt, one process would capture the mutex and then block that it would not work for this.)
Building and testing
To build and test this, I use gcc version 7.2.1 and valgrind 3.13 on a 64-bit Linux machine. Structure:
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
, :
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
telnet 127.0.0.1 5555 . , helgrind , , runTx runRx :
== 16188 == 1 0x1FFEFFF1CC # 1
== 16188 == : none
... ,
concurrent.cpp
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]\n";
} else {
std::cout << command << '\n';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resetting\n";
}
}
}