I have a class that extends Task celery. It works fine with the old API, but I'm having trouble converting it to the new API.
# In app/tasks.py from celery import Celery, Task celery = Celery() @celery.task class CustomTask(Task): def run(self, x): try: # do something except Exception, e: self.retry(args=[x], exc=e)
And then I run the task like this:
CustomTask().apply_async(args=[x], queue='q1')
And I get an error message -
TypeError: run() takes exactly 2 arguments (1 given)
This SO answer seems to be doing the exact same thing, and it was adopted as intended, it works. Can someone help me and explain to me why my code is not working?
EDIT
This works if I call a task other than the class name -
name = 'app.tasks.CustomTask2'
But if I save the task name in the same way as the full class name, it does not work
name = 'app.tasks.CustomTask'
But the problem with a different name is that celery has an additional task with the same name as the task class name.
source share