ZeroMQ PUSH / PULL and the lost message

I am using ZeroMQ from .NET and am getting stuck trying to fix a strange problem. I have a socket of type PUSH and one type of PULL over TCP. When the client disconnects, the server can still send a message (note that the flags are not passed to the Socket.Send method), which receives all batches before the start of blocking and expects the client to reconnect and deliver messages. I try to send later.

How can I avoid losing a message (or in the worst case, if the client is connected and if it does not send a fictitious message that I can afford to lose)?

Thanks in advance!

Edit: further testing shows that if I wait 1 second after sending the first message after the client disconnects, the second will be blocked, but if I do not wait at all, I can send as many messages as I want, and they will all be lost. This is pretty confusing ...

+6
client-server sockets zeromq ipc
source share
1 answer

The ZeroMQ documentation states that this is a problem with the PUSH / PULL settings and offers the following pattern: adding a REP / REQ setting to ensure node coordination when you expect a fixed number of subscribers. However, if you cannot know in advance the number of subscribers, you should consider changing your protocol in order to be more resistant to these conditions.

Synchronized Publisher in C (from ZGuide)

// //  Synchronized publisher // #include "zhelpers.h" //  We wait for 10 subscribers #define SUBSCRIBERS_EXPECTED  10 int main (void) {     s_version_assert (2, 1);     void *context = zmq_init (1);    //  Socket to talk to clients     void *publisher = zmq_socket (context, ZMQ_PUB);     zmq_bind (publisher, "tcp://*:5561");     //  Socket to receive signals     void *syncservice = zmq_socket (context, ZMQ_REP);     zmq_bind (syncservice, "tcp://*:5562");     //  Get synchronization from subscribers     int subscribers = 0;     while (subscribers < SUBSCRIBERS_EXPECTED) {         //  - wait for synchronization request         char *string = s_recv (syncservice);         free (string);         //  - send synchronization reply         s_send (syncservice, "");         subscribers++;     }     //  Now broadcast exactly 1M updates followed by END     int update_nbr;     for (update_nbr = 0; update_nbr < 1000000; update_nbr++)         s_send (publisher, "Rhubarb");     s_send (publisher, "END");     zmq_close (publisher);     zmq_close (syncservice);     zmq_term (context);     return 0; } 
+3
source share

All Articles