I use ZeroMQ to connect to the network in my application, the manual states that by providing the ZMQ_DONTWAIT flag ZMQ_DONTWAIT the flags parameter in send or recv , the function does not block the stream. But in my case, this does not work:
std::cout << "a"; if(ToSend.try_pop(send)) { std::cout << "b"; local.send(send.data(),send.size(),ZMQ_DONTWAIT); } std::cout << "c"; if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) std::cout << "Received: " << (char*)recv.data() << std::endl; std::cout << "d" << std::endl;
this prints:
abcdab
I made a small class to make things easier:
Client class (divided into all "unused" things, for simplicity)
class client { public: client() { } inline bool init(unsigned short threads = 1) { Running = true; context = zmq_init (threads); if(context == NULL) return false; socket = zmq_socket (context, ZMQ_REQ); if(socket == NULL) return false; return true; } inline int connect(const char * address, unsigned short port) { return zmq_connect(socket,string_format("tcp://%s:%d",address,port).c_str()); } inline bool send (void *data, size_t len_, int flags_ = 0) { message_t request (len_); memcpy ((void *) request.data (), data, len_); int rc = zmq_send (socket, request.data(), request.size(), flags_); if (rc >= 0) return true; if (rc == -1 && zmq_errno () == EAGAIN) return false; throw error_t (); } inline bool recv (void * data, size_t len_, int flags_) { message_t reply(len_); int rc = zmq_recv (socket, reply.data(), len_, flags_); if (rc >= 0) { memcpy (data,(void *)reply.data(), reply.size()); return true; } if (rc == -1 && zmq_errno () == EAGAIN)return false; throw error_t (); } inline bool IsRunning() { return Running; } private: void * context; void * socket; std::atomic<bool> Running; };
and here is the workflow:
namespace Data { Concurrency::concurrent_queue <message_t> ToSend; void Processor(char * address, unsigned short port, unsigned short threads) { client local; if(!local.init(threads))return; if(local.connect(address,port) != 0)return; message_t recv(Networking::max_packet); message_t send(Networking::max_packet); while(local.IsRunning()) { std::cout << "a"; if(ToSend.try_pop(send)) { std::cout << "b"; local.send(send.data(),send.size(),ZMQ_DONTWAIT); } std::cout << "c"; if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) std::cout << "Received: " << (char*)recv.data() << std::endl; std::cout << "d" << std::endl; } } };
The problem exists here somehow. I just donβt know why it is not working.
This is how I start the workflow:
int Thread(char * address, unsigned short port, unsigned short threads) { std::thread data(Data::Processor,address,port,threads); data.detach(); while(!Data::status){} return Data::status; } int main(int argc, char* argv[]) { std::thread s(Server::RUN); Client::message_t tosend(14); memcpy((void*)tosend.data(),"Hello World !\0",14); Client::Data::ToSend.push(tosend); std::cout << Client::Thread("127.0.0.1",5555,1) << std::endl; s.join(); return 0; }
This all seems correct, so why does recv / send block my thread? Why does the flag not work?