Callback for celery apply_async

I use celery in my application to run periodic tasks. Let's see a simple example below

 from myqueue import Queue @perodic_task(run_every=timedelta(minutes=1)) def process_queue(): queue = Queue() uid, questions = queue.pop() if uid is None: return job = group(do_stuff(q) for q in questions) job.apply_async() def do_stuff(question): try: ... except: ... raise 

As you can see in the example above, I use celery to run the async task, but (since this is the queue), I need to do queue.fail(uid) if thrown in do_stuff or queue.ack(uid) otherwise. In this situation, it would be very clear and useful to have some callback from my task in both cases - on_failure and on_success .

I saw some documentation but never saw the practice of using callbacks with apply_async . Is it possible to do this?

+7
source share
2 answers

Subclass of the Task class and overloading the on_success and on_failure functions:

 class CallbackTask(Task): def on_success(self, retval, task_id, args, kwargs): pass def on_failure(self, exc, task_id, args, kwargs, einfo): pass @celery.task(base=CallbackTask) # this does the trick def add(x, y): return x + y 
+26
source

You can specify successful and error callbacks by reference and link_err kwargs when you call apply_async. Celery docs have a clear example: http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

+5
source

All Articles