Worker pools and multi-user queues with RabbitMQ

I am working on a web application, which is a multi-user cloud application (many clients, each with its own separate "environment", but all on common sets of hardware), and we present the opportunity for the user to complete work for further processing. Types of batch work are really not important, it's just a sufficient amount that does this without a work queue is actually not practical. We chose RabbitMQ as the basic queue structure.

Since we are a multi-user application, we don’t necessarily want clients to be able to call lengthy temporary queue processes for another client, so the one idea that we popped up creates a queue for each client and the common pool of workers pointed to ALL of our client queues. The problem is that, as far as I can understand, workers are directly related to a particular queue, not to an exchange. In our ideal world, our client queues will still be processed without one client blocking another from the common pool of workers, which we can grow or squeeze if necessary, by launching more workers or closing idle ones. The presence of workers tied to a specific queue interferes with us in this practical sense, since we often had many workers who simply stood in line without activity.

Is there a relatively direct way to do this? I am new to RabbitMQ and actually have not been able to achieve what we need. We also do not want to write a very complex multi-threaded consumer application, or the time spent on dev and test time, which we probably cannot afford. Our stack is Windows / .Net / C #, if this is German, but I don’t think it should be important in the matter under consideration.

+13
rabbitmq amqp worker
source share
5 answers

You can look at the implementation of the priority queue (which was not implemented when this question was originally asked): https://www.rabbitmq.com/priority.html

If this does not work for you, you can try other hacks to achieve what you want (which should work with older versions of RabbitMQ):

You can have 100 queues associated with a topic exchange, and set the routing key to the hash of user ID% 100, that is, each task will have a key from 1 to 100, and tasks for the same user will have the same key , Each queue is tied to a unique pattern between 1 and 100. Now you have a fleet of workers who start with a random queue number and then increase this queue number after each job, again% 100, to return to queue 1 after queue 100.

Now your working fleet can process up to 100 unique users in parallel, or all employees can focus on one user if there is no other job. If employees need to sort through all 100 queues between each task, in the scenario that only one user has many tasks in one queue, you will naturally have some overhead between each work. Fewer queues are one way to deal with this. In addition, each worker can connect to each of the queues and consume up to one unconfirmed message from each. Then the worker can iterate through the waiting messages in memory much faster, provided that the timeout of the unconfirmed message is set high enough.

Alternatively, you can create two exchanges, each of which has an associated queue. All work goes to the first exchange and the queue that the workers pool consumes. If the work unit takes too much time, the worker can cancel it and click on the second step. Workers process the second stage only when there is nothing in the first stage. You may also want several workers with priority to queue to ensure that long-running tasks are still processed when an endless stream of short tasks arrives, so that a batch of users will always be processed in the end. This will not really distribute your work fleet across all tasks, but it will stop long-running tasks from one user, keeping your employees from performing short running tasks for the same user. It also suggests that you can cancel the job and restart it later without any problems. It also means that resources will be wasted from tasks requiring a timeout, and they need to be re-run as a low priority. If you cannot predefine fast and slow tasks

The first sentence with 100 queues can also have a problem if there are 100 slow tasks for one user, then another user sends a task package. These tasks will not be considered until one of the slow tasks is completed. If this proves to be a legitimate problem, you can combine the two solutions.

+3
source share

You can simply add that your worker pool uses the same unique queue. Then work will be distributed among them, and you can increase / decrease your pool to increase / decrease the productivity of your work.

+1
source share

I don’t understand why you are not using RabbitMQ vhosts and connect the application to RabbitMQ and authenticate on a separate connection for each user.

This does not mean that you cannot have an employee manager who assigns employees to one user or another. But this means that all messages for each user are processed completely by separate exchanges and queues.

+1
source share

Workers are assigned 0+ queues, not exchanges.

The logic from which tasks will be performed, of which the queues for each employee are implemented in the class specified through CELERYD_CONSUMER , which is celery.worker.consumer.Consumer by default.

You can create your own ro consumer class that implements any logic you like. The careful part will determine the details of the "justice" algorithm that you want to use; but as soon as you decide this, you can implement it by creating a custom consumer class and assigning it to the appropriate workers.

0
source share

I worked on a simple working pool example using rabbitmq. This should give some illustration of how to achieve this. https://github.com/kennykarnama/rabbitmq-worker-pool

0
source share

All Articles