Getting subscriber count using zubomq PUB / SUB sockets

Can I get the total number of subscribers from the PUB slot in zeromq?

Thanks!

+8
source share
3 answers

Yes, but, unfortunately, not using any simple property or method.

You need to use the zmq_socket_monitor () function to connect the inproc service socket to the main socket that you want to monitor. From there, you can listen to events related to connection / disconnection, and keep a count of subscribers. This may not be a trivial task, since it seems (at least to me) a little difficult to understand when to consider the subscriber (or any remote connection) on / off (closed / disconnected / repeated, etc.). You will have to play a little.

The link includes samples and descriptions of events.

+7
source

This is a NodeJS implementation for rep, I think for pub it is the same.

As Jakob MΓΆllΓ₯s said, you need to use a monitor.

 const zmq = require('zmq') , rep = zmq.socket('rep'); let counter = 0; rep.bind('tcp://*:5560', function (err) { if (err) { console.log(err); } else { console.log("Listening on 5560…"); rep.monitor(500, 0); } }); // Register to monitoring events rep.on('connect', function (fd, ep) { console.log('connect, endpoint:', ep); }); rep.on('connect_delay', function (fd, ep) { console.log('connect_delay, endpoint:', ep); }); rep.on('connect_retry', function (fd, ep) { console.log('connect_retry, endpoint:', ep); }); rep.on('listen', function (fd, ep) { console.log('listen, endpoint:', ep); }); rep.on('bind_error', function (fd, ep) { console.log('bind_error, endpoint:', ep); }); rep.on('accept', function (fd, ep) { console.log('accept, endpoint:', ep); counter++; }); rep.on('accept_error', function (fd, ep) { console.log('accept_error, endpoint:', ep); }); rep.on('close', function (fd, ep) { console.log('close, endpoint:', ep); }); rep.on('close_error', function (fd, ep) { console.log('close_error, endpoint:', ep); }); rep.on('disconnect', function (fd, ep) { console.log('disconnect, endpoint:', ep); counter--; }); // Handle monitor error rep.on('monitor_error', function(err) { console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err); setTimeout(function() { rep.monitor(500, 0); }, 5000); }); rep.on('message', function (msg) { console.log(`recieve: `, JSON.parse(msg)); rep.send(JSON.stringify({ "status": FAIL, "code": 3666 })); }); 

Prefixes

 recieve: { method: 'login', login: 'a', password: 'b1' } accept, endpoint: tcp://0.0.0.0:5560 accept, endpoint: tcp://0.0.0.0:5560 login: a, password: b1 recieve: { method: 'login', login: 'a', password: 'b1' } disconnect, endpoint: tcp://0.0.0.0:5560 login: a, password: b1 disconnect, endpoint: tcp://0.0.0.0:5560 
+3
source

There seems to be no direct way. The following is the Python code for tracking socket events that you can use to maintain the number:

 import zmq from zmq.eventloop import ioloop, zmqstream import zmq.utils.monitor class Publication: def start(self, port, host): context = zmq.Context() self._socket = context.socket(zmq.PUB) self._socket.bind("tcp://%s:%d" % (host, port)) self._mon_socket = self._socket.get_monitor_socket(zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED) self._mon_stream = zmqstream.ZMQStream(self._mon_socket) self._mon_stream.on_recv(self._on_mon) def _on_mon(self, msg): ev = zmq.utils.monitor.parse_monitor_message(msg) event = ev['event'] endpoint = ev['endpoint'] if event == zmq.EVENT_CONNECTED: pass # print(endpoint) elif event == zmq.EVENT_DISCONNECTED: pass #print(endpoint) 

One problem is that for some reason, the CONNECTED event does not fire. Another problem is that even when an event occurs, you get only the endpoint identifier, which is similar to the tcp: // ip: port line. Thus, for multiple clients on the same node, you get the same endpoint identifier.

0
source

All Articles