Celery Tasks Priority

I want to manage tasks using Celery. I want to have one task queue (with concurrency 1) and be able to run tasks in a queue with different priorities so that tasks with a higher priority crowd out others.

I add three tasks to the queue as follows:

add_tasks.py

from tasks import example_task example_task.apply_async((1), priority=1) example_task.apply_async((2), priority=3) example_task.apply_async((3), priority=2) 

I have the following configuration:

tasks.py

 from __future__ import absolute_import, unicode_literals from celery import Celery from kombu import Queue, Exchange import time app = Celery('tasks', backend='rpc://', broker='pyamqp://') app.conf.task_queues = [Queue('celery', Exchange('celery'), routing_key='celery', queue_arguments={'x-max-priority': 10})] @app.task def example_task(task_num): time.sleep(3) print('Started {}'.format(task_num) return True 

I expect the second task, which I added to run before the third, because it has a higher priority, but it is not. They run in the order they are added.

I follow the docs and think I configured the application correctly.

Am I doing something wrong or am I misunderstanding a priority function?

+6
python celery
source share
1 answer

There is a possibility that the queue has no chance to prioritize messages (because they are loaded before sorting starts). Try with these two settings (adapt your project if necessary):

 CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 

The default prefetch ratio is 4.

I developed a sample application for realizing celery priority tasks on a very small scale. Look at him here . Developing it, I ran into a very similar problem, and this change in settings actually resolved it.

Please note that you also require RabbitMQ version 3.5.0 or higher.

+4
source share

All Articles