Using boost :: asio :: io_service :: post ()

First, I asked about this. Starting a function in the main thread from the boost stream and passing parameters to this function

now i am trying to do this:

The following is a C ++ console project in which I perfectly modeled my large project

TestServicePost.cpp

#include "stdafx.h" #include "SomeClass.h" int _tmain(int argc, _TCHAR* argv[]) { SomeClass* s = new SomeClass(); while(true) { s->update(); } return 0; } 

SomeClass.h

 #include <boost/thread.hpp> #include <boost/asio.hpp> #include <queue> class ServiceNote { public: std::string getType() { std::stringstream typeSS; typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour; return typeSS.str(); } int Action; int CNoteId; std::string Colour; int NoteId; std::string NoteType; int SessionId; std::string Shape; std::string Style; std::string Text; int X; int Y; }; class SomeClass { public: SomeClass(); ~SomeClass(); void update(); private: std::queue<ServiceNote> pendingNotes; void addToQueue(ServiceNote sn); void pollService(boost::asio::io_service* svc); int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId); boost::thread servicePoller; }; 

SomeClass.cpp

 #include "stdafx.h" #include "SomeClass.h" #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/json_parser.hpp> #include <boost/asio/signal_set.hpp> #define POLL_SERVICE = 0; #define POLLING_WAIT_TIME 1000 #define SAVE_SESSION_EVERY 1800000 SomeClass::SomeClass() { boost::asio::io_service io_servicePoller; io_servicePoller.run(); servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller)); /*boost::asio::io_service io_sessionSaver; boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM); signalsSaver.async_wait( boost::bind(&boost::asio::io_service::stop, &io_sessionSaver)); sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/ } SomeClass::~SomeClass() { } void SomeClass::update() { while(!pendingNotes.empty()) { ServiceNote sn = pendingNotes.front(); pendingNotes.pop(); } } void SomeClass::addToQueue(ServiceNote sn) { pendingNotes.push(sn); } void SomeClass::pollService(boost::asio::io_service* svc) { int messageId = 1; while(true) { if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested()) return; int currentId = messageId; messageId = getMessage(svc, "49", messageId); if(currentId == messageId) boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME)); } } int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId) { try { boost::asio::io_service io_service; // Get a list of endpoints corresponding to the server name. boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http"); boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); // Try each endpoint until we successfully establish a connection. boost::asio::ip::tcp::socket socket(io_service); boost::asio::connect(socket, endpoint_iterator); // Form the request. We specify the "Connection: close" header so that the // server will close the socket after transmitting the response. This will // allow us to treat all data up until the EOF as the content. boost::asio::streambuf request; std::ostream request_stream(&request); request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n"; request_stream << "Host: " << "mw.rombus.com" << "\r\n"; request_stream << "Accept: */*\r\n"; request_stream << "Connection: close\r\n\r\n"; // Send the request. boost::asio::write(socket, request); // Read the response status line. The response streambuf will automatically // grow to accommodate the entire line. The growth may be limited by passing // a maximum size to the streambuf constructor. boost::asio::streambuf response; boost::asio::read_until(socket, response, "\r\n"); // Check that response is OK. std::istream response_stream(&response); std::string http_version; response_stream >> http_version; unsigned int status_code; response_stream >> status_code; std::string status_message; std::getline(response_stream, status_message); if (!response_stream || http_version.substr(0, 5) != "HTTP/") { //std::cout << "Invalid response\n"; return messageId; } if (status_code != 200) { //std::cout << "Response returned with status code " << status_code << "\n"; return messageId; } // Read the response headers, which are terminated by a blank line. boost::asio::read_until(socket, response, "\r\n\r\n"); // Process the response headers. std::string header; std::string fullHeader = ""; while (std::getline(response_stream, header) && header != "\r") fullHeader.append(header).append("\n"); // Write whatever content we already have to output. std::string fullResponse = ""; if (response.size() > 0) { std::stringstream ss; ss << &response; fullResponse = ss.str(); try { boost::property_tree::ptree pt; boost::property_tree::read_json(ss, pt); ServiceNote sn; sn.Action = pt.get<int>("Action"); sn.CNoteId = pt.get<int>("CNoteId"); sn.Colour = pt.get<std::string>("Colour"); sn.NoteId = pt.get<int>("NoteId"); sn.NoteType = pt.get<std::string>("NoteType"); sn.SessionId = pt.get<int>("SessionId"); sn.Shape = pt.get<std::string>("Shape"); sn.Style = pt.get<std::string>("Style"); sn.Text = pt.get<std::string>("Text"); sn.X = pt.get<int>("X"); sn.Y = pt.get<int>("Y"); svc->post(boost::bind(&SomeClass::addToQueue, this, sn)); //pendingNotes.push(sn); } catch (std::exception const& e) { std::string test = e.what(); //std::cerr << e.what() << std::endl; } messageId++; } // Read until EOF, writing data to output as we go. std::string fullSth = ""; boost::system::error_code error; while (boost::asio::read(socket, response, boost::asio::transfer_at_least(1), error)) { std::ostringstream ss; ss << &response; fullSth = ss.str(); } if (error != boost::asio::error::eof) throw boost::system::system_error(error); } catch (std::exception& e) { std::string test = e.what(); std::cout << "Exception: " << e.what() << "\n"; } return messageId; } 

but I get Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4. right after this line is executed:

 svc->post(boost::bind(&SomeClass::addToQueue, this, sn)); 

I could not define io_service as a member of the class, so I can use it in the ~SomeClass() destructor, it would also be very useful

If io_service.post is not the best solution for me, please recommend something, as you can see that I have a constructor, destructor and update method that every tick is called, I tried to use this and the queue alone, but this wasn 't thread safe, is there a safe FIFO thread to use?

+4
source share
2 answers

I figured out how to declare io_service as a member of a class:

 boost::shared_ptr< boost::asio::io_service > io_servicePoller; 

and in the constructor I did the following:

 SomeClass::SomeClass() { boost::shared_ptr< boost::asio::io_service > io_service( new boost::asio::io_service ); io_servicePoller = io_service; servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller)); } 

Some cleaning

 SomeClass::~SomeClass() { servicePoller.interrupt(); io_servicePoller->stop(); servicePoller.join(); } 

and in the update, I called run, which adds material to the queue, then reads them in a while loop

 void SomeClass::update() { io_servicePoller->run(); io_servicePoller->reset(); while(!pendingNotes.empty()) { ServiceNote sn = pendingNotes.front(); pendingNotes.pop(); } } 

and changed the signature of my members to void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

So what happens:

  • Application launches
  • in my class
  • my class creates a service and starts the thread
  • the stream retrieves items from the service
  • the main thread checks the io service queue and issues it
  • then he uses the queue

Thanks to Igor R., I could not do this without him

as well as http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 , where I got a way to make a general pointer

+2
source

In the SomeClass constructor SomeClass you really do the following:

  • Define a local io_service instance.
  • Call its run() member function, which returns immediately since io_service is not working.
  • Pass the address of the local object to another stream.

This, of course, will not work.

Note that io_service::run() is a kind of "message loop", so it should block the calling thread. Do not call it in the constructor of the object.

+4
source

All Articles