Well, you cannot use the mechanism retryif you want to redirect the task to another queue. From the docs:
The retry () function can be used to re-execute the task, for example, in the case of recoverable errors.
When you call again, he will send a new message using the same task-id, and he will ensure that the message is delivered in the same queue as the original task.
You will have to restart yourself and send it manually to the desired queue in case of any exception. It seems to work well for error callbacks .
, , . . , .
from functools import partial, wraps
import celery
@celery.shared_task
def error_callback(task_id, task_name, retry_queue, retry_routing_key):
task = celery.current_app.tasks[task_name]
task.apply_async(queue=retry_queue, routing_key=retry_routing_key)
def retrying_task(retry_queue, retry_routing_key):
"""Decorates function to automatically add error callbacks."""
def retrying_decorator(func):
@celery.shared_task
@wraps(func) # just to keep the original task name
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.apply_async = partial(
wrapper.apply_async,
link_error=error_callback.s(wrapper.name, retry_queue, retry_routing_key)
)
return wrapper
return retrying_decorator
@retrying_task(retry_queue='another_queue', retry_routing_key='another_routing_key')
def failing_task():
print 'Hi, I will fail!'
raise Exception("I'm failing!")
failing_task.apply_async()
, .