Boost :: asio UDP "collect" operation

The documentation boost::asiofor async_receive()it claims that it supports "getting multiple buffers at a time", and although I can code it I can not see how (or if) it works.

We have a situation where one of our suppliers sends us many thousands of UDP packets per second, it’s enough for us to see that in some situations there is an “outlier packet with errors”.

It would be ideal if we really could fill several buffers with one call async_receive(), but during testing it seems that even if several buffers are specified, the handler is called for only one datagram.

I have included my test code, sorry this is so verbose, but I need it to be flexible to listen to multiple interfaces / multicasts.

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
#include <algorithm>
#include <vector>
#include <string>
#include <cstdint>

// configuration options...
std::string nic;
std::string mc;
uint16_t port = 0;
uint16_t buffer_size = 0;
uint32_t socket_buffer_size = 0;
uint32_t scat_cnt = 1;

// The raw data buffer
std::vector<uint8_t> buffer;

// The scatter/gather buffer
std::vector<boost::asio::mutable_buffer> gather_buffer;

boost::asio::io_service svc;
std::unique_ptr<boost::asio::ip::udp::socket> socket_;

size_t messages_received = 0;
size_t bytes_received = 0;

bool parse_command_line(std::vector<std::string> command_line);

void on_receive(const boost::system::error_code& ec, size_t bytes)
{
   if(!ec)
   {
      socket_->async_receive(
         gather_buffer,
         [] (const boost::system::error_code& ec, size_t bytes)
      {
         on_receive(ec, bytes);
      });

      ++messages_received;
      bytes_received += bytes;

      if(0 == messages_received % 1000)
      {
         std::cout << "Received: " << messages_received << " messages, " << bytes_received << " bytes.\n";
      }
   }
   else
   {
      std::cout << "Error: " << ec.message() << '\n';
   }
}

int main(int argc, char** argv)
{
   if(parse_command_line(std::vector<std::string>(argv, argv+argc)))
   {
      try
      {
         std::cout << "Resizing segment buffer to: " << buffer_size << std::endl;
         buffer.resize(buffer_size * scat_cnt);

         for(uint32_t x = 0; x < scat_cnt; ++x)
         {
            gather_buffer.push_back(
               boost::asio::buffer(buffer.data() + (buffer_size * x), buffer_size));
         }

         std::cout << "Setting up receiving socket." << std::endl;
         socket_.reset(new boost::asio::ip::udp::socket(svc));
         socket_->open(boost::asio::ip::udp::v4());

         socket_->set_option(boost::asio::socket_base::reuse_address(true));

         std::cout << "Binding to local NIC: " << nic << std::endl;
         socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(nic), port));

         boost::asio::socket_base::non_blocking_io no_block(true);
         socket_->io_control(no_block);

         std::cout << "Setting socket buffer size to " << socket_buffer_size << std::endl;
         boost::asio::socket_base::receive_buffer_size sock_bf_sz(socket_buffer_size);
         socket_->set_option(sock_bf_sz);

         std::cout << "Joining multicast " << mc << " on " << nic << std::endl;
         boost::asio::ip::multicast::join_group jg(boost::asio::ip::address_v4::from_string(mc), boost::asio::ip::address_v4::from_string(nic));
         socket_->set_option(jg);

         std::cout << "Listening..." << std::endl;

         socket_->async_receive(
            gather_buffer,
            [] (const boost::system::error_code& ec, size_t bytes)
         {
            on_receive(ec, bytes);
         });

         std::unique_ptr<boost::asio::io_service::work> w(new boost::asio::io_service::work(svc));

         std::cout << "Starting boost proactor..." << std::endl;
         boost::thread thread([&] () { svc.run(); });

         boost::this_thread::sleep_for(boost::chrono::seconds(60));
         w.reset();

         thread.join();
      }

      catch(boost::system::error_code& ec)
      {
         std::cout << "Boost error: " << ec.message() << '\n';
      }

      catch(...)
      {
         std::cout << "Unknown Error!\n";
      }
   }

   return 0;
}

bool parse_command_line(std::vector<std::string> command_line)
{
   for(size_t idx = 0, max_switches = command_line.size();
      idx < max_switches; ++idx)
   {
      auto& curr = command_line[idx];
      std::transform(curr.begin(), curr.end(), curr.begin(), ::tolower);

      if(curr == "-nic" && ++idx < max_switches)
      {
         nic = command_line[idx];
      }
      else if(curr == "-multicast" && ++idx < max_switches)
      {
         mc = command_line[idx];
      }
      else if(curr == "-port" && ++idx < max_switches)
      {
         port = boost::lexical_cast<uint16_t>(command_line[idx]);
      }
      else if(curr == "-bfsz" && ++idx < max_switches)
      {
         buffer_size = boost::lexical_cast<uint16_t>(command_line[idx]);
      }
      else if(curr == "-sockbfsz" && ++idx < max_switches)
      {
         socket_buffer_size = boost::lexical_cast<uint32_t>(command_line[idx]);
      }
      else if(curr == "-scattercnt" && ++idx < max_switches)
      {
         scat_cnt = boost::lexical_cast<uint32_t>(command_line[idx]);
      }
   }

   std::cout
      << "NIC:                " << nic << '\n'
      << "MC:                 " << mc << '\n'
      << "Port:               " << port << '\n'
      << "Segment Size:       " << buffer_size << '\n'
      << "Socket Buffer Size: " << socket_buffer_size << '\n'
      << "Scatter/Gather:     " << scat_cnt << std::endl;

   return 
      !nic.empty() &&
      !mc.empty() &&
      port != 0 &&
      buffer_size != 0 &&
      socket_buffer_size != 0
      ;
}
+4
source share
1 answer

It will receive in several buffers, but you will never get more than one datagram at a time. This is how it works recv, and I think that people would be very surprised if his behavior suddenly changed. The purpose of the collection is to split a single packet into several buffers, which can be useful if you expect to receive data that is conceptually segmented, but not for receiving multiple buffers. It is basically a shell around recvmsg.

Maybe try not to use boost :: asio?

+2
source

All Articles