Does the ZMQ_DONTWAIT flag not work?

I use ZeroMQ to connect to the network in my application, the manual states that by providing the ZMQ_DONTWAIT flag ZMQ_DONTWAIT the flags parameter in send or recv , the function does not block the stream. But in my case, this does not work:

  std::cout << "a"; if(ToSend.try_pop(send)) { std::cout << "b"; local.send(send.data(),send.size(),ZMQ_DONTWAIT); } std::cout << "c"; if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) std::cout << "Received: " << (char*)recv.data() << std::endl; std::cout << "d" << std::endl; 

this prints:

 abcdab 

I made a small class to make things easier:

Client class (divided into all "unused" things, for simplicity)

 class client { public: client() { } inline bool init(unsigned short threads = 1) { Running = true; context = zmq_init (threads); if(context == NULL) return false; socket = zmq_socket (context, ZMQ_REQ); if(socket == NULL) return false; return true; } inline int connect(const char * address, unsigned short port) { return zmq_connect(socket,string_format("tcp://%s:%d",address,port).c_str()); } inline bool send (void *data, size_t len_, int flags_ = 0) { message_t request (len_); memcpy ((void *) request.data (), data, len_); int rc = zmq_send (socket, request.data(), request.size(), flags_); if (rc >= 0) return true; if (rc == -1 && zmq_errno () == EAGAIN) return false; throw error_t (); } inline bool recv (void * data, size_t len_, int flags_) { message_t reply(len_); int rc = zmq_recv (socket, reply.data(), len_, flags_); if (rc >= 0) { memcpy (data,(void *)reply.data(), reply.size()); return true; } if (rc == -1 && zmq_errno () == EAGAIN)return false; throw error_t (); } inline bool IsRunning() { return Running; } private: void * context; void * socket; std::atomic<bool> Running; }; 

and here is the workflow:

 namespace Data { Concurrency::concurrent_queue <message_t> ToSend; void Processor(char * address, unsigned short port, unsigned short threads) { client local; if(!local.init(threads))return; if(local.connect(address,port) != 0)return; message_t recv(Networking::max_packet); message_t send(Networking::max_packet); while(local.IsRunning()) { std::cout << "a"; if(ToSend.try_pop(send)) { std::cout << "b"; local.send(send.data(),send.size(),ZMQ_DONTWAIT); } std::cout << "c"; if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) std::cout << "Received: " << (char*)recv.data() << std::endl; std::cout << "d" << std::endl; } } }; 

The problem exists here somehow. I just don’t know why it is not working.

This is how I start the workflow:

 int Thread(char * address, unsigned short port, unsigned short threads) { std::thread data(Data::Processor,address,port,threads); data.detach(); while(!Data::status){} return Data::status; } int main(int argc, char* argv[]) { std::thread s(Server::RUN); Client::message_t tosend(14); memcpy((void*)tosend.data(),"Hello World !\0",14); Client::Data::ToSend.push(tosend); std::cout << Client::Thread("127.0.0.1",5555,1) << std::endl; s.join(); return 0; } 

This all seems correct, so why does recv / send block my thread? Why does the flag not work?

+7
source share
1 answer

He typed "abcd" and then "ab". This means that the code

 std::cout << "a"; if(ToSend.try_pop(send)) { std::cout << "b"; 

was executed twice, although the code you provided at the end of the question implies that you only did ToSend.push () once.

Did you plug in a debugger to see which thread was hanging and what it was causing?

  • Put std :: cout behind mutex / critsect so that you know that only one thread writes at a time, and report which thread you are.
  • Place each pin on its own line so you don't miss a letter due to buffering.
  • If you are not blocking data, how are you going to receive it? Do you have a select / poll / WaitForSingleObject call somewhere?
+1
source

All Articles