The manufacturer module of my application is launched by users who want to send work to a small cluster. It sends JSON subscriptions through the RabbitMQ message broker.
I have tried several strategies, and the best so far is the following, which does not yet fully work:
Each cluster machine launches a consumer module, which subscribes to the AMQP queue and issues prefetch_count to tell the broker how many tasks he can perform right away.
I managed to get it to work using SelectConnection from the Pika AMQP library. Both the consumer and the manufacturer launch two channels, one connected to each queue. The producer sends requests to channel [A] and waits for responses in channel [B], and the consumer waits for requests to channel [A] and sends responses to channel [B]. It seems, however, that when the consumer launches a callback that calculates the answer, it blocks, so I only have one task that each user performs at any given time.
What I need at the end:
- consumer [A] signs his tasks (about 5k each time) to the cluster
- the broker sends N messages / requests for each consumer, where N is the number of simultaneous tasks that he can handle
- when one task is completed, the consumer answers the broker / producer with the result
- the manufacturer receives responses, updates the status of the calculations, and finally prints some reports
Limitations:
- If another user submits the work, all his tasks will be queued after the previous user (I think this is automatically true from the queue system, but I did not think about the consequences for the streaming environment)
- Tasks must be sent, but the order to which they are responsible does not matter.
UPDATE
I studied a little further, and my actual problem seems to be that I am using a simple function as a callback to the pica SelectConnection.channel.basic_consume () function. My last (unrealized) idea is to transfer the streaming function, not the usual one, so that the callback is not blocked and the consumer can continue to listen.
python design-patterns rabbitmq amqp pika
guhcampos
source share