The documentation boost::asiofor async_receive()it claims that it supports "getting multiple buffers at a time", and although I can code it I can not see how (or if) it works.
We have a situation where one of our suppliers sends us many thousands of UDP packets per second, it’s enough for us to see that in some situations there is an “outlier packet with errors”.
It would be ideal if we really could fill several buffers with one call async_receive(), but during testing it seems that even if several buffers are specified, the handler is called for only one datagram.
I have included my test code, sorry this is so verbose, but I need it to be flexible to listen to multiple interfaces / multicasts.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
#include <algorithm>
#include <vector>
#include <string>
#include <cstdint>
std::string nic;
std::string mc;
uint16_t port = 0;
uint16_t buffer_size = 0;
uint32_t socket_buffer_size = 0;
uint32_t scat_cnt = 1;
std::vector<uint8_t> buffer;
std::vector<boost::asio::mutable_buffer> gather_buffer;
boost::asio::io_service svc;
std::unique_ptr<boost::asio::ip::udp::socket> socket_;
size_t messages_received = 0;
size_t bytes_received = 0;
bool parse_command_line(std::vector<std::string> command_line);
void on_receive(const boost::system::error_code& ec, size_t bytes)
{
if(!ec)
{
socket_->async_receive(
gather_buffer,
[] (const boost::system::error_code& ec, size_t bytes)
{
on_receive(ec, bytes);
});
++messages_received;
bytes_received += bytes;
if(0 == messages_received % 1000)
{
std::cout << "Received: " << messages_received << " messages, " << bytes_received << " bytes.\n";
}
}
else
{
std::cout << "Error: " << ec.message() << '\n';
}
}
int main(int argc, char** argv)
{
if(parse_command_line(std::vector<std::string>(argv, argv+argc)))
{
try
{
std::cout << "Resizing segment buffer to: " << buffer_size << std::endl;
buffer.resize(buffer_size * scat_cnt);
for(uint32_t x = 0; x < scat_cnt; ++x)
{
gather_buffer.push_back(
boost::asio::buffer(buffer.data() + (buffer_size * x), buffer_size));
}
std::cout << "Setting up receiving socket." << std::endl;
socket_.reset(new boost::asio::ip::udp::socket(svc));
socket_->open(boost::asio::ip::udp::v4());
socket_->set_option(boost::asio::socket_base::reuse_address(true));
std::cout << "Binding to local NIC: " << nic << std::endl;
socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(nic), port));
boost::asio::socket_base::non_blocking_io no_block(true);
socket_->io_control(no_block);
std::cout << "Setting socket buffer size to " << socket_buffer_size << std::endl;
boost::asio::socket_base::receive_buffer_size sock_bf_sz(socket_buffer_size);
socket_->set_option(sock_bf_sz);
std::cout << "Joining multicast " << mc << " on " << nic << std::endl;
boost::asio::ip::multicast::join_group jg(boost::asio::ip::address_v4::from_string(mc), boost::asio::ip::address_v4::from_string(nic));
socket_->set_option(jg);
std::cout << "Listening..." << std::endl;
socket_->async_receive(
gather_buffer,
[] (const boost::system::error_code& ec, size_t bytes)
{
on_receive(ec, bytes);
});
std::unique_ptr<boost::asio::io_service::work> w(new boost::asio::io_service::work(svc));
std::cout << "Starting boost proactor..." << std::endl;
boost::thread thread([&] () { svc.run(); });
boost::this_thread::sleep_for(boost::chrono::seconds(60));
w.reset();
thread.join();
}
catch(boost::system::error_code& ec)
{
std::cout << "Boost error: " << ec.message() << '\n';
}
catch(...)
{
std::cout << "Unknown Error!\n";
}
}
return 0;
}
bool parse_command_line(std::vector<std::string> command_line)
{
for(size_t idx = 0, max_switches = command_line.size();
idx < max_switches; ++idx)
{
auto& curr = command_line[idx];
std::transform(curr.begin(), curr.end(), curr.begin(), ::tolower);
if(curr == "-nic" && ++idx < max_switches)
{
nic = command_line[idx];
}
else if(curr == "-multicast" && ++idx < max_switches)
{
mc = command_line[idx];
}
else if(curr == "-port" && ++idx < max_switches)
{
port = boost::lexical_cast<uint16_t>(command_line[idx]);
}
else if(curr == "-bfsz" && ++idx < max_switches)
{
buffer_size = boost::lexical_cast<uint16_t>(command_line[idx]);
}
else if(curr == "-sockbfsz" && ++idx < max_switches)
{
socket_buffer_size = boost::lexical_cast<uint32_t>(command_line[idx]);
}
else if(curr == "-scattercnt" && ++idx < max_switches)
{
scat_cnt = boost::lexical_cast<uint32_t>(command_line[idx]);
}
}
std::cout
<< "NIC: " << nic << '\n'
<< "MC: " << mc << '\n'
<< "Port: " << port << '\n'
<< "Segment Size: " << buffer_size << '\n'
<< "Socket Buffer Size: " << socket_buffer_size << '\n'
<< "Scatter/Gather: " << scat_cnt << std::endl;
return
!nic.empty() &&
!mc.empty() &&
port != 0 &&
buffer_size != 0 &&
socket_buffer_size != 0
;
}
source
share