What is the best template for developing an asynchronous RPC application using Python, Pika, and AMQP?

The manufacturer module of my application is launched by users who want to send work to a small cluster. It sends JSON subscriptions through the RabbitMQ message broker.

I have tried several strategies, and the best so far is the following, which does not yet fully work:

Each cluster machine launches a consumer module, which subscribes to the AMQP queue and issues prefetch_count to tell the broker how many tasks he can perform right away.

I managed to get it to work using SelectConnection from the Pika AMQP library. Both the consumer and the manufacturer launch two channels, one connected to each queue. The producer sends requests to channel [A] and waits for responses in channel [B], and the consumer waits for requests to channel [A] and sends responses to channel [B]. It seems, however, that when the consumer launches a callback that calculates the answer, it blocks, so I only have one task that each user performs at any given time.

What I need at the end:

  • consumer [A] signs his tasks (about 5k each time) to the cluster
  • the broker sends N messages / requests for each consumer, where N is the number of simultaneous tasks that he can handle
  • when one task is completed, the consumer answers the broker / producer with the result
  • the manufacturer receives responses, updates the status of the calculations, and finally prints some reports

Limitations:

  • If another user submits the work, all his tasks will be queued after the previous user (I think this is automatically true from the queue system, but I did not think about the consequences for the streaming environment)
  • Tasks must be sent, but the order to which they are responsible does not matter.

UPDATE

I studied a little further, and my actual problem seems to be that I am using a simple function as a callback to the pica SelectConnection.channel.basic_consume () function. My last (unrealized) idea is to transfer the streaming function, not the usual one, so that the callback is not blocked and the consumer can continue to listen.

+8
python design-patterns rabbitmq amqp pika
source share
3 answers

As you noticed, your process is blocked when you call back. There are several ways to handle this, depending on what your callback is doing.

If your callback is IO-related (does a lot of network or disk I / O), you can use either streams or a greening solution like gevent , eventlet or greenhouse . Keep in mind that Python is limited to GIL (Global Interpreter Lock), which means that only one piece of python code is executed in one python process. This means that if you do a lot of computing with python code, these solutions will probably not be much faster than you already have.

Another option is to implement your consumer as multiple processes using multiprocessing . I found that multiprocessing is very useful when running in parallel. You can implement this using Queue , having a parent process that is a consumer and processing the work of its children, or simply running several processes that each one consumes on its own. I would advise if your application is not simultaneous enough (1000 workers) to just run several workers, each of which consumes them from its own connection. Thus, you can use the AMQP confirmation function, therefore, if the consumer dies during the processing of the task, the message is sent back to the queue automatically and will be picked up by another worker, and not just lose the request.

The last parameter, if you are managing the manufacturer and it is also written in Python, should use a task library such as celery to abstract the task / queue for you. I used celery for several large projects and found it to be very well written. It will also handle numerous consumer issues for you with the appropriate configuration.

+2
source share

Your setup sounds good to me. And you're right, you can just set a callback to start a thread and chain, to a separate callback when the thread ends the queue to answer back on channel B.

Basically, your consumers should have their own queue (size N, the amount of parallelism that they support). When a request arrives through channel A, it must store the result in a queue shared between the main thread using Pika and worker threads in the thread pool. Once it is in the queue, pika should respond with an ACK, and your workflow will wake up and begin processing.

As soon as the employee completes his work, he will return the result to a separate queue of results and issue a callback to the main thread to send it back to the consumer.

You should take care and make sure that work flows do not interfere with each other if they use shared resources, but this is a separate section.

0
source share

Being inexperienced in streaming mode, my installation will start several consumer processes (the number of which is mainly your prefetch account). Each of them will be connected to two queues, and they will successfully process tasks that are unknown about each other's existence.

0
source share

All Articles