Connect a new celery periodic task in django

This is not a question, but help for those who find that declaring periodic tasks described in celery 4.0.1 documentation is difficult to integrate into django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks. html # entries

copy the celery config folder to main_app/celery.py :

 from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 am sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) 

Question

But what if we use django and our tasks are placed in another application? With celery 4.0.1 , we no longer have the @periodic_task decorator. So let's see what we can do.

First case

If you prefer to keep tasks and their schedule close to each other:

main_app/some_app/tasks.py

 from main_app.celery import app as celery_app @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello')) @celery_app.task def test(arg): print(arg) 

We can run beat in debug mode:

 celery -A main_app beat -l debug 

and we will see that there is no such periodic task.

Second case

We can try to describe all periodic tasks in the configuration file as follows:

main_app/celery.py

 ... app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. from main_app.some_app.tasks import test sender.add_periodic_task(10.0, test.s('hello')) ... 

The result is the same. But it will behave differently, which you can see with manual debugging via pdb. In the first setup_periodic_tasks example setup_periodic_tasks callback will not be run at all. But in the second example, we get django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. (this exception will not be printed)

+7
python django celery
source share
2 answers

For django, we need to use a different signal: @celery_app.on_after_finalize.connect . It can be used for both:

  • the announcement of the task schedule is close to the task in app/tasks.py , because this signal will be triggered after all tasks.py imported and all possible receivers are already signed (the first case).
  • central planning declaration since django applications will already be initialized and ready to import (second case)

I think I should write a final declaration:

First case

The announcement of the task schedule is close to the task:

main_app/some_app/tasks.py

 from main_app.celery import app as celery_app @celery_app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello')) @celery_app.task def test(arg): print(arg) 

Second case

A central planning declaration in the main_app/celery.py configuration file:

 ... app = Celery() @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. from main_app.some_app.tasks import test sender.add_periodic_task(10.0, test.s('hello')) ... 
+14
source share

If the goal is to maintain the logic of the task separately in tasks.py, calling from main_app.some_app.tasks import test inside setup_periodic_tasks did not work for me. What works:

celery.py

 @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') @app.task def test(arg): print(arg) from some_app.tasks import test test(arg) 

tasks.py

 @shared_task def test(arg): print('world') 

This led to the following result:

 [2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready. [2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] [2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello [2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world [2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None [2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test) [2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b] [2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello [2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world 
+4
source share

All Articles