Celery: how can I redirect a failed task to a dead letter queue

I am new to celery and I am trying to integrate this task queue into my project, but I still do not understand how celery handles unsuccessful tasks, and I would like to keep all this in the dead letter amqp of the queue.

According to the document here, it seems that increasing Deviation in a task with acks_late enabled causes the same effect as when responding to a message, and then we have a few words about dead letter queues.

So, I added a default user queue to my celery configuration

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'], CELERY_TASK_SERIALIZER='json', CELERY_QUEUES=[CELERY_QUEUE, CELERY_DLX_QUEUE], CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME, CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE ) 

and my kombu objects look like

 CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct') CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE, routing_key='celery-dlq') DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME, 'x-dead-letter-routing-key': 'celery-dlq'} CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME, arguments=DEAD_LETTER_CELERY_OPTIONS, type='direct') CELERY_QUEUE = Queue(CELERY_QUEUE_NAME, exchange=CELERY_EXCHANGE, routing_key='celery-q') 

And the task that I perform:

 class HookTask(Task): acks_late = True def run(self, ctx, data): logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self)) self.hook_process(ctx, data) def on_failure(self, exc, task_id, args, kwargs, einfo): logger.error('task_id %s failed, message: %s', task_id, exc.message) def hook_process(self, t_ctx, body): # Build context ctx = TaskContext(self.request, t_ctx) logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id) raise Reject('no_reason', requeue=False) 

I did a little test with it, but with no results when throwing a Reject exception.

Now I'm wondering if I can force the failed task route to the dead letter queue by overriding Task.on_failure. I think this will work, but I also think that this solution is not so clean, because in accordance with the fact that I red celery should do it all alone.

Thanks for your help.

+6
source share
2 answers

I think you should not add arguments=DEAD_LETTER_CELERY_OPTIONS to CELERY_EXCHANGE. You must add it to CELERY_QUEUE with queue_arguments=DEAD_LETTER_CELERY_OPTIONS .

The following example is what I did, and it works great:

 from celery import Celery from kombu import Exchange, Queue from celery.exceptions import Reject app = Celery( 'tasks', broker='amqp:// guest@localhost :5672//', backend='redis://localhost:6379/0') dead_letter_queue_option = { 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } default_exchange = Exchange('default', type='direct') dlx_exchange = Exchange('dlx', type='direct') default_queue = Queue( 'default', default_exchange, routing_key='default', queue_arguments=dead_letter_queue_option) dead_letter_queue = Queue( 'dead_letter', dlx_exchange, routing_key='dead_letter') app.conf.task_queues = (default_queue, dead_letter_queue) app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'default' app.conf.task_default_routing_key = 'default' @app.task def add(x, y): return x + y @app.task(acks_late=True) def div(x, y): try: z = x / y return z except ZeroDivisionError as exc: raise Reject(exc, requeue=False) 

After creating the queue, you will see that the DLX (dead letter) and DLK (dead letter routing) labels are displayed in the Features column.

enter image description here

NOTE. You must delete previous queues if you have already created them in RabbitMQ. This is due to the fact that celery will not delete the existing queue and will not create a new one.

+1
source

I have a similar case, and I ran into the same problems. I also wanted a solution based on configuration, not hard coded values. The proposed Hengfeng Li solution was very useful and helped me understand the mechanism and concepts. But there was a problem with declaring dead letter queues. In particular, if you entered DLQ in task_default_queues , the celery consumed a queue and it was always empty. Therefore, a manual way of declaring DL (X / Q) is needed.

I used Celery Bootsteps as they provide good control at the code execution stage. My initial experiment was to create them after the application was created, but this created a slow connection after marking up the processes and created an ugly exception. With bootstep, which runs exactly after the Pool step, you can be sure that it starts at the beginning of each worker after it is split and the connection pool is ready.

Finally, I created a decorator that converts uncaught exceptions to task failures by reraising with the Reject celery. Particular attention is paid to cases where the problem of how to handle, for example, repetitions has already been solved.

Here is a complete working example. Try running the div.delay(1, 0) task and see how it works.

 from celery import Celery from celery.exceptions import Reject, TaskPredicate from functools import wraps from kombu import Exchange, Queue from celery import bootsteps class Config(object): APP_NAME = 'test' task_default_queue = '%s_celery' % APP_NAME task_default_exchange = "%s_celery" % APP_NAME task_default_exchange_type = 'direct' task_default_routing_key = task_default_queue task_create_missing_queues = False task_acks_late = True # Configuration for DLQ support dead_letter_exchange = '%s_dlx' % APP_NAME dead_letter_exchange_type = 'direct' dead_letter_queue = '%s_dlq' % APP_NAME dead_letter_routing_key = dead_letter_queue class DeclareDLXnDLQ(bootsteps.StartStopStep): """ Celery Bootstep to declare the DL exchange and queues before the worker starts processing tasks """ requires = {'celery.worker.components:Pool'} def start(self, worker): app = worker.app # Declare DLX and DLQ dlx = Exchange( app.conf.dead_letter_exchange, type=app.conf.dead_letter_exchange_type) dead_letter_queue = Queue( app.conf.dead_letter_queue, dlx, routing_key=app.conf.dead_letter_routing_key) with worker.app.pool.acquire() as conn: dead_letter_queue.bind(conn).declare() app = Celery('tasks', broker='pyamqp:// guest@localhost //') app.config_from_object(Config) # Declare default queues # We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support default_exchange = Exchange( app.conf.task_default_exchange, type=app.conf.task_default_exchange_type) default_queue = Queue( app.conf.task_default_queue, default_exchange, routing_key=app.conf.task_default_routing_key, queue_arguments={ 'x-dead-letter-exchange': app.conf.dead_letter_exchange, 'x-dead-letter-routing-key': app.conf.dead_letter_routing_key }) # Inject the default queue in celery application app.conf.task_queues = (default_queue,) # Inject extra bootstep that declares DLX and DLQ app.steps['worker'].add(DeclareDLXnDLQ) def onfailure_reject(requeue=False): """ When a task has failed it will raise a Reject exception so that the message will be requeued or marked for insertation in Dead Letter Exchange """ def _decorator(f): @wraps(f) def _wrapper(*args, **kwargs): try: return f(*args, **kwargs) except TaskPredicate: raise # Do not handle TaskPredicate like Retry or Reject except Exception as e: print("Rejecting") raise Reject(str(e), requeue=requeue) return _wrapper return _decorator @app.task() @onfailure_reject() def div(x, y): return x / y 

Edit: I updated the code to use the new celery configuration scheme (lower case) since I found some compatibility issues in Celery 4.1.0.

+1
source

All Articles