Celery: cancel or cancel all tasks in the chord

I use the following setup with a Redis broker and backend:

chord([A, A, A, ...])(B)

  • Task A performs some checks. It uses AbortableTaskas a base and regularly checks the flag task.is_aborted().
  • Task B notifies the user of the result of the calculation.

The user has the ability to interrupt the execution of tasks A. Unfortunately, when invoked AbortableAsyncResult(task_a_id).abort()for all instances of task A, only active ones are interrupted. The status for tasks that have not yet been received by the worker has been changed to ABORTED, but they are still being processed, and the flag is_aborted()returns False.

I could, of course, revoke()deferred tasks instead of using abort()them, but the problem is that in this case the body of the chord (task B) is no longer performed.

How do all stopped instances of pending and running tasks stop while maintaining execution of task B?

+4
source share
2 answers

Just get the id list of all instances Aand stop them.

Consider this simple chord

from celery import chord 

my_chord = chord(a.si() for i in range(300))(b.si())

Now you can get the list of subtasks (all lines of the task A) from the instance my_chordusing

for taks in my_chord.parent.subtasks:
    print(task.id)

Now you can do whatever you want with these tasks. For example, you can undo everything regardless of their current state.

from celery.task.control import revoke

for task in my_chord.parent.subtasks:
    revoke(task.id, terminate=True)

revokeby default, kills only unfinished tasks. But if you give it to him terminate=True, he will also destroy the tasks that are performed.

, . , , . , .

+2

, , , A. , , (A) , , . , ​​ B

0

All Articles