I am new to celery and I am trying to integrate this task queue into my project, but I still do not understand how celery handles unsuccessful tasks, and I would like to keep all this in the dead letter amqp of the queue.
According to the document here, it seems that increasing Deviation in a task with acks_late enabled causes the same effect as when responding to a message, and then we have a few words about dead letter queues.
So, I added a default user queue to my celery configuration
celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'], CELERY_TASK_SERIALIZER='json', CELERY_QUEUES=[CELERY_QUEUE, CELERY_DLX_QUEUE], CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME, CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE )
and my kombu objects look like
CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct') CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE, routing_key='celery-dlq') DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME, 'x-dead-letter-routing-key': 'celery-dlq'} CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME, arguments=DEAD_LETTER_CELERY_OPTIONS, type='direct') CELERY_QUEUE = Queue(CELERY_QUEUE_NAME, exchange=CELERY_EXCHANGE, routing_key='celery-q')
And the task that I perform:
class HookTask(Task): acks_late = True def run(self, ctx, data): logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self)) self.hook_process(ctx, data) def on_failure(self, exc, task_id, args, kwargs, einfo): logger.error('task_id %s failed, message: %s', task_id, exc.message) def hook_process(self, t_ctx, body):
I did a little test with it, but with no results when throwing a Reject exception.
Now I'm wondering if I can force the failed task route to the dead letter queue by overriding Task.on_failure. I think this will work, but I also think that this solution is not so clean, because in accordance with the fact that I red celery should do it all alone.
Thanks for your help.