Zeromq: how to prevent endless waiting?

I just started working with ZMQ. I am developing an application whose workflow is:

  • one of many clients (which have random PULL addresses) PUSH request to server in 5555
  • the server is always waiting for client PUSHes. When it arrives, a workflow is generated for this particular request. Yes, workflows can exist simultaneously.
  • When this process completes the task, it returns the result to the client.

I assume that the PUSH / PULL architecture is suitable for this. Please correct me .




But how do I handle these scenarios?

  • client_receiver.recv () will wait an infinite time when the server cannot respond.
  • the client can send a request, but it will work immediately after that, so the workflow will remain on server_sender.send () forever.

So, how do I configure something like a timeout in the PUSH / PULL model?




EDIT : thanks to user938949 for the suggestions, I got a working answer , and I shared it for posterity.

+63
python zeromq
Sep 24 '11 at 12:25
source share
4 answers

If you are using zeromq> = 3.0, you can set the RCVTIMEO socket option:

client_receiver.RCVTIMEO = 1000 # in milliseconds 

But in general, you can use indicators:

 poller = zmq.Poller() poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send 

And poller.poll() takes a timeout:

 evts = poller.poll(1000) # wait *up to* one second for a message to arrive. 

evts will be an empty list if nothing is received.

You can poll with zmq.POLLOUT to see if you can send the message.

Or, to handle a peer case that could have been a failure, a:

 worker.send(msg, zmq.NOBLOCK) 

it may be enough that it will always be returned immediately - raising ZMQError (zmq.EAGAIN) if the sending cannot be completed.

+70
Sep 24 '11 at 16:33
source share

It was a quick hack that I did after I called user938949 the answer and http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ . If you do better, send your answer, I will recommend your answer .

For those who want durable solutions for reliability, see http://zguide.zeromq.org/page:all#toc64

Version 3.0 zeromq (beta-ATM) supports timeout in ZMQ_RCVTIMEO and ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Server

Zmq.NOBLOCK ensures that when the client does not exist, the send () function is not blocked.

 import time import zmq context = zmq.Context() ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") i=0 while True: i=i+1 time.sleep(0.5) print ">>sending message ",i try: ventilator_send.send(repr(i),zmq.NOBLOCK) print " succeed" except: print " failed" 

Client

The poller object can be listened on in many receiving sockets (see โ€œPython Multiprocessing with ZeroMQโ€ above. I only linked it to work_receiver . In an infinite loop, client polls with an interval of 1000 ฮผs. The sock object returns empty if it wasnโ€™t at this time received message.

 import time import zmq context = zmq.Context() work_receiver = context.socket(zmq.PULL) work_receiver.connect("tcp://127.0.0.1:5557") poller = zmq.Poller() poller.register(work_receiver, zmq.POLLIN) # Loop and accept messages from both channels, acting accordingly while True: socks = dict(poller.poll(1000)) if socks: if socks.get(work_receiver) == zmq.POLLIN: print "got message ",work_receiver.recv(zmq.NOBLOCK) else: print "error: message timeout" 
+15
Sep 26 2018-11-11T00:
source share

If you use ZMQ_NOBLOCK, you will block sending, but if you try to close the socket and context, this step will block the program from starting.

The reason is that the socket is waiting for some peer to ensure the delivery of outgoing messages. To immediately close the socket and remove outgoing messages from the buffer, use ZMQ_LINGER and set it to 0 ..

+8
Jun 01 '12 at 7:50
source share

If you expect only one socket and not create Poller , you can do this:

 if work_receiver.poll(1000, zmq.POLLIN): print "got message ",work_receiver.recv(zmq.NOBLOCK) else: print "error: message timeout" 

You can use this if your timeout changes depending on the situation, instead of setting work_receiver.RCVTIMEO .

0
May 02 '19 at 1:41
source share



All Articles