In dotCloud, we use a lot of zerorpc streaming for pubs / sub-streams. Let me describe our way of doing this.
Finally
We show a streaming method decorated with @ zerorpc.stream. This method, when called, will add gevent.queue to the set. Then the method will be cyclically forever, which will give all the messages arriving in the queue. When this method completes (because the client is disconnected), the queue is removed from the set.
To publish, simply publish the message that will be published in each queue registered in the set. At this time, you need to decide what you want to do with slow consumers (turn them off, stop them to a certain limit and / or drop new messages).
Example implementation with zerorpc-python:
Signing part
class MyService(object): def __init__(self): self._subscribers = set() @zerorpc.stream def subscribe(self): try: queue = gevent.queue.Queue() self._subscribers.add(queue) for msg in queue: yield msg finally: self._subscribers.remove(queue)
The subscribe method simply adds an event queue to the set. Then use the queue forever until: - the queue ends with a StopIteration message (see the gevent.queue.Queue documentation) - the green subscription function is killed (usually because the client is disconnected)
In both cases, the finally statement is executed, and the queue is removed from the list of subscribers.
Please note that at this point you can limit the size of the queue: ...Queue(maxsize=42) .
Publication part
class MyService(object): [...] def _publish(self, msg): for queue in self._subscribers: if queue.size < 42: queue.put(msg)
Call this method to post a message. He will iterate over all the queues of subscribers to put a message in it. In my example, if the queue reaches a certain size, I discard the message. But there is no limit to which template you want to apply there.
You can save the subscriberβs greenlet instance in the set and then kill it when the queue is full, effectively disconnecting the slow client (you can even try sending a message informing the client that it is too slow). You can also wait for your entire consumer to process messages in parallel before returning from _publish, etc. The sky limits my friend!
Hope this helps!