Having queues of errors in celery

Is there any way in celery with which, if the task is not running, I can automatically put it in another queue.

For example, a task runs in a queue x, with the exception placing it in another queue with the nameerror_x

Edit:

I am currently using, celery==3.0.13along with django 1.4, Rabbitmq as a broker.

Several times the task is interrupted. Is there a way in celery to add messages to the error queue and process them later.

The problem with the celery task crashing is that I do not have access to the message queue name. Therefore, I cannot use the attempt self.retryto put it in another error queue.

+4
source share
2 answers

Well, you cannot use the mechanism retryif you want to redirect the task to another queue. From the docs:

The retry () function can be used to re-execute the task, for example, in the case of recoverable errors.

When you call again, he will send a new message using the same task-id, and he will ensure that the message is delivered in the same queue as the original task.

You will have to restart yourself and send it manually to the desired queue in case of any exception. It seems to work well for error callbacks .

, , . . , .

from functools import partial, wraps

import celery


@celery.shared_task
def error_callback(task_id, task_name, retry_queue, retry_routing_key):
    # We must retrieve the task object itself.
    # `tasks` is a dict of 'task_name': celery_task_object
    task = celery.current_app.tasks[task_name]
    # Re launch the task in specified queue.
    task.apply_async(queue=retry_queue, routing_key=retry_routing_key)


def retrying_task(retry_queue, retry_routing_key):
    """Decorates function to automatically add error callbacks."""
    def retrying_decorator(func):
        @celery.shared_task
        @wraps(func)  # just to keep the original task name
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        # Monkey patch the apply_async method to add the callback.
        wrapper.apply_async = partial(
            wrapper.apply_async,
            link_error=error_callback.s(wrapper.name, retry_queue, retry_routing_key)
        )
        return wrapper
    return retrying_decorator


# Usage:
@retrying_task(retry_queue='another_queue', retry_routing_key='another_routing_key')
def failing_task():
    print 'Hi, I will fail!'
    raise Exception("I'm failing!")

failing_task.apply_async()

, .

+1

, , , :

django celery, .

celery, , , "RUNNING" , "FAILED", . , , , - , , , .

0

All Articles