Python ZeroMQ PUSH / PULL - Lost Messages?

I am trying to use python with zeroMQ in PUSH / PULL mode, sending messages of size 4 [MB] every few seconds.

For some reason, although it seems that all messages have been sent, ONLY SOME of them seem to have been received by the server. What am I missing here?

Here the code for the client is client.py

 import zmq import struct # define a string of size 4[MB] msgToSend = struct.pack('i', 45) * 1000 * 1000 context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect("tcp://127.0.0.1:5000") # print the message size in bytes print len(msgToSend) socket.send(msgToSend) print "Sent message" 

And here is the code for the server - server.py

 import zmq import struct context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://127.0.0.1:5000") while True: # receive the message msg = socket.recv() print "Message Size is: {0} [MB]".format( len(msg) / (1000 * 1000) ) 

What am I missing? How can I guarantee that messages are always sent and not lost?

In case that matters, I use Ubuntu 10.04 32bit, Core Duo with 2 RAM [RAM].

NOTE. . I tried the same example using RabbitMQ and everything works fine - not a single message is lost. I am puzzled since I often hear zeroMQ praise. Why did this fail if RabbitMQ succeeds?

+8
python zeromq
source share
2 answers

The problem is that when the program crashes, the socket closes immediately, and garbage is collected with an effective LINGER of 0 (i.e. it deletes any unsent messages). This is a problem for larger messages because they take longer to send than it takes to collect garbage.

You can avoid this by placing sleep(0.1) just before the program exits (to delay the socket and context that was garbage collected).

socket.setsockopt(zmq.LINGER, -1) (which by default) should avoid this problem, but for some reason I did not have time to investigate.

+14
source share

We can assume that you have run out of memory (depending on how you send messages, whether they are consumed enough, etc.). You can use socket.setsockopt(zmq.HWM) to set HWM to its normal value and prevent zeromq from storing too many messages in the outgoing buffer. With this in mind, consider a few modified examples:

 # server ... counter = 0 while True: ...receive the message counter += 1 print "Total messages recieved: {0}".format(counter) # client socket.setsockopt(zmq.HWM, 8) for i in range(1000): socket.send(msgToSend) 

And then run 10 test clients:

 for i in {1..10}; do python client.py & done 

From the server you can see all the messages:

 Total messages recieved: 9998 Total messages recieved: 9999 Total messages recieved: 10000 
+1
source share

All Articles