First, I asked about this. Starting a function in the main thread from the boost stream and passing parameters to this function
now i am trying to do this:
The following is a C ++ console project in which I perfectly modeled my large project
TestServicePost.cpp
#include "stdafx.h" #include "SomeClass.h" int _tmain(int argc, _TCHAR* argv[]) { SomeClass* s = new SomeClass(); while(true) { s->update(); } return 0; }
SomeClass.h
#include <boost/thread.hpp> #include <boost/asio.hpp> #include <queue> class ServiceNote { public: std::string getType() { std::stringstream typeSS; typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour; return typeSS.str(); } int Action; int CNoteId; std::string Colour; int NoteId; std::string NoteType; int SessionId; std::string Shape; std::string Style; std::string Text; int X; int Y; }; class SomeClass { public: SomeClass(); ~SomeClass(); void update(); private: std::queue<ServiceNote> pendingNotes; void addToQueue(ServiceNote sn); void pollService(boost::asio::io_service* svc); int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId); boost::thread servicePoller; };
SomeClass.cpp
#include "stdafx.h" #include "SomeClass.h" #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/json_parser.hpp> #include <boost/asio/signal_set.hpp> #define POLL_SERVICE = 0; #define POLLING_WAIT_TIME 1000 #define SAVE_SESSION_EVERY 1800000 SomeClass::SomeClass() { boost::asio::io_service io_servicePoller; io_servicePoller.run(); servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller)); /*boost::asio::io_service io_sessionSaver; boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM); signalsSaver.async_wait( boost::bind(&boost::asio::io_service::stop, &io_sessionSaver)); sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/ } SomeClass::~SomeClass() { } void SomeClass::update() { while(!pendingNotes.empty()) { ServiceNote sn = pendingNotes.front(); pendingNotes.pop(); } } void SomeClass::addToQueue(ServiceNote sn) { pendingNotes.push(sn); } void SomeClass::pollService(boost::asio::io_service* svc) { int messageId = 1; while(true) { if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested()) return; int currentId = messageId; messageId = getMessage(svc, "49", messageId); if(currentId == messageId) boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME)); } } int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId) { try { boost::asio::io_service io_service; // Get a list of endpoints corresponding to the server name. boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http"); boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); // Try each endpoint until we successfully establish a connection. boost::asio::ip::tcp::socket socket(io_service); boost::asio::connect(socket, endpoint_iterator); // Form the request. We specify the "Connection: close" header so that the // server will close the socket after transmitting the response. This will // allow us to treat all data up until the EOF as the content. boost::asio::streambuf request; std::ostream request_stream(&request); request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n"; request_stream << "Host: " << "mw.rombus.com" << "\r\n"; request_stream << "Accept: */*\r\n"; request_stream << "Connection: close\r\n\r\n"; // Send the request. boost::asio::write(socket, request); // Read the response status line. The response streambuf will automatically // grow to accommodate the entire line. The growth may be limited by passing // a maximum size to the streambuf constructor. boost::asio::streambuf response; boost::asio::read_until(socket, response, "\r\n"); // Check that response is OK. std::istream response_stream(&response); std::string http_version; response_stream >> http_version; unsigned int status_code; response_stream >> status_code; std::string status_message; std::getline(response_stream, status_message); if (!response_stream || http_version.substr(0, 5) != "HTTP/") { //std::cout << "Invalid response\n"; return messageId; } if (status_code != 200) { //std::cout << "Response returned with status code " << status_code << "\n"; return messageId; } // Read the response headers, which are terminated by a blank line. boost::asio::read_until(socket, response, "\r\n\r\n"); // Process the response headers. std::string header; std::string fullHeader = ""; while (std::getline(response_stream, header) && header != "\r") fullHeader.append(header).append("\n"); // Write whatever content we already have to output. std::string fullResponse = ""; if (response.size() > 0) { std::stringstream ss; ss << &response; fullResponse = ss.str(); try { boost::property_tree::ptree pt; boost::property_tree::read_json(ss, pt); ServiceNote sn; sn.Action = pt.get<int>("Action"); sn.CNoteId = pt.get<int>("CNoteId"); sn.Colour = pt.get<std::string>("Colour"); sn.NoteId = pt.get<int>("NoteId"); sn.NoteType = pt.get<std::string>("NoteType"); sn.SessionId = pt.get<int>("SessionId"); sn.Shape = pt.get<std::string>("Shape"); sn.Style = pt.get<std::string>("Style"); sn.Text = pt.get<std::string>("Text"); sn.X = pt.get<int>("X"); sn.Y = pt.get<int>("Y"); svc->post(boost::bind(&SomeClass::addToQueue, this, sn)); //pendingNotes.push(sn); } catch (std::exception const& e) { std::string test = e.what(); //std::cerr << e.what() << std::endl; } messageId++; } // Read until EOF, writing data to output as we go. std::string fullSth = ""; boost::system::error_code error; while (boost::asio::read(socket, response, boost::asio::transfer_at_least(1), error)) { std::ostringstream ss; ss << &response; fullSth = ss.str(); } if (error != boost::asio::error::eof) throw boost::system::system_error(error); } catch (std::exception& e) { std::string test = e.what(); std::cout << "Exception: " << e.what() << "\n"; } return messageId; }
but I get Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4. right after this line is executed:
svc->post(boost::bind(&SomeClass::addToQueue, this, sn));
I could not define io_service as a member of the class, so I can use it in the ~SomeClass() destructor, it would also be very useful
If io_service.post is not the best solution for me, please recommend something, as you can see that I have a constructor, destructor and update method that every tick is called, I tried to use this and the queue alone, but this wasn 't thread safe, is there a safe FIFO thread to use?