ZeroRPC publish subscription

I would like to configure an event-based system between my servers. For example, when a server that wraps my database logic changes state, I would like it to notify my other servers. The publication / subscription design seems perfect for this, and I heard well about ZeroRPC.

Some people have mentioned using zerorpc streaming to execute pub / sub, however it doesn't seem obvious to me how events will be triggered using streaming.

+4
source share
2 answers

In dotCloud, we use a lot of zerorpc streaming for pubs / sub-streams. Let me describe our way of doing this.

Finally

We show a streaming method decorated with @ zerorpc.stream. This method, when called, will add gevent.queue to the set. Then the method will be cyclically forever, which will give all the messages arriving in the queue. When this method completes (because the client is disconnected), the queue is removed from the set.

To publish, simply publish the message that will be published in each queue registered in the set. At this time, you need to decide what you want to do with slow consumers (turn them off, stop them to a certain limit and / or drop new messages).

Example implementation with zerorpc-python:

Signing part

class MyService(object): def __init__(self): self._subscribers = set() @zerorpc.stream def subscribe(self): try: queue = gevent.queue.Queue() self._subscribers.add(queue) for msg in queue: yield msg finally: self._subscribers.remove(queue) 

The subscribe method simply adds an event queue to the set. Then use the queue forever until: - the queue ends with a StopIteration message (see the gevent.queue.Queue documentation) - the green subscription function is killed (usually because the client is disconnected)

In both cases, the finally statement is executed, and the queue is removed from the list of subscribers.

Please note that at this point you can limit the size of the queue: ...Queue(maxsize=42) .

Publication part

 class MyService(object): [...] def _publish(self, msg): for queue in self._subscribers: if queue.size < 42: queue.put(msg) 

Call this method to post a message. He will iterate over all the queues of subscribers to put a message in it. In my example, if the queue reaches a certain size, I discard the message. But there is no limit to which template you want to apply there.

You can save the subscriber’s greenlet instance in the set and then kill it when the queue is full, effectively disconnecting the slow client (you can even try sending a message informing the client that it is too slow). You can also wait for your entire consumer to process messages in parallel before returning from _publish, etc. The sky limits my friend!

Hope this helps!

+10
source

ZeroRPC now has a completely different set of publisher / subscriber features, and it works great on the network!

You may be interested in reading the ZeroRPC tests for more tips on how to use them, in this case the Publisher and Subscriber classes. Here are the tests.

In addition, there are a lot of good information in the ØMQ docs about the publisher / subscriber template, etc. You can find it here.

+2
source

All Articles