What is the “perverse” way of consuming messages from rabbitmq and forwarding them through client connections?

I am writing a websocket server in twisted to find out the structure. It will receive messages from the rabbitmq broker, as well as send updates to connected clients. If I want to broadcast / multilate a lot of messages at the same time through a lot of client connections, deferToThread(channel.basic_consume, queue) calling (as an example) deferToThread(channel.basic_consume, queue) or callInThread(" ") very good option for this?

If not, what will be the twisted way to use messages from rabbitmq and redirect them to connected clients?

My strategy so far is:

reactor_thread: listen on port (x) to configure and support client connections

other_thread: subscribe to the rabbitmq queue and use messages if any (continues forever)

+5
source share
1 answer

calls (as an example) deferToThread (channel.basic_consume, queue) or callInThread ("") is a very good option for this?

Using threads does not really bring much benefit in this situation, since messages are already queued in RabbitMQ. I have been in similar situations in the past, and I can give you a high-level overview of what I did to solve the problem without using threads. Disclaimer: I have not worked with RabbitMQ or Websockets for a year or 2, so my knowledge may be a little fuzzy.

List of connected clients

Assuming you are using autobahn for websites, you can add a variable to the factory class ( autobahn.twisted.websocket.WebSocketServerFactory ) that will monitor connected clients. Either list or dict will work fine.

 factory = WebSocketServerFactory() factory.connection_list = [] 

The connection_list variable will store protocol objects ( autobahn.twisted.websocket.WebSocketServerProtocol ) after the connection is established. In the protocol, you will need to overload the connectionMade function to add the protocol ( self in this case) to self.factory.connection_list .

 def connectionMade(self): super(WSProtocol, self).connectionMade() self.factory.connection_list.append(self) 

It is probably best to create something like “onConnect deferred” for flexibility, but that is the essence of this. Maybe autobahn provides an interface for this.

Rabbitmq

Using pika , you can use messages asynchronously using example . Make the necessary changes to the names of the channels and exchanges so that they work with your setup. Then we will make 2 changes. First we go to factory.connection_list for callbacks, and then when the message is consumed, we will write it to the connected client protocols.

 @defer.inlineCallbacks def run(connection, proto_list): #... l = task.LoopingCall(read, queue_object, proto_list) l.start(0.01) @defer.inlineCallbacks def read(queue_object, proto_list): #... if body: print(body) for client in sorted(proto_list): yield client.write(body) yield ch.basic_ack(delivery_tag=method.delivery_tag) #... d.addCallback(run, factory.connection_list) reactor.run() 

In the read callback function, every time a message is consumed, the cyclization task will iterate over the list of connected clients and send them a message.

+3
source

All Articles