You can look at the implementation of the priority queue (which was not implemented when this question was originally asked): https://www.rabbitmq.com/priority.html
If this does not work for you, you can try other hacks to achieve what you want (which should work with older versions of RabbitMQ):
You can have 100 queues associated with a topic exchange, and set the routing key to the hash of user ID% 100, that is, each task will have a key from 1 to 100, and tasks for the same user will have the same key , Each queue is tied to a unique pattern between 1 and 100. Now you have a fleet of workers who start with a random queue number and then increase this queue number after each job, again% 100, to return to queue 1 after queue 100.
Now your working fleet can process up to 100 unique users in parallel, or all employees can focus on one user if there is no other job. If employees need to sort through all 100 queues between each task, in the scenario that only one user has many tasks in one queue, you will naturally have some overhead between each work. Fewer queues are one way to deal with this. In addition, each worker can connect to each of the queues and consume up to one unconfirmed message from each. Then the worker can iterate through the waiting messages in memory much faster, provided that the timeout of the unconfirmed message is set high enough.
Alternatively, you can create two exchanges, each of which has an associated queue. All work goes to the first exchange and the queue that the workers pool consumes. If the work unit takes too much time, the worker can cancel it and click on the second step. Workers process the second stage only when there is nothing in the first stage. You may also want several workers with priority to queue to ensure that long-running tasks are still processed when an endless stream of short tasks arrives, so that a batch of users will always be processed in the end. This will not really distribute your work fleet across all tasks, but it will stop long-running tasks from one user, keeping your employees from performing short running tasks for the same user. It also suggests that you can cancel the job and restart it later without any problems. It also means that resources will be wasted from tasks requiring a timeout, and they need to be re-run as a low priority. If you cannot predefine fast and slow tasks
The first sentence with 100 queues can also have a problem if there are 100 slow tasks for one user, then another user sends a task package. These tasks will not be considered until one of the slow tasks is completed. If this proves to be a legitimate problem, you can combine the two solutions.
Laserjesus
source share