I have an application with drawings and celery code here:
config.py
import os from celery.schedules import crontab basedir = os.path.abspath(os.path.dirname(__file__)) class Config: SECRET_KEY = os.environ.get('SECRET_KEY') or '' SQLALCHEMY_COMMIT_ON_TEARDOWN = True RECORDS_PER_PAGE = 40 SQLALCHEMY_DATABASE_URI = '' CELERY_BROKER_URL = '' CELERY_RESULT_BACKEND = '' CELERY_RESULT_DBURI = '' CELERY_TIMEZONE = 'Europe/Kiev' CELERY_ENABLE_UTC = False CELERYBEAT_SCHEDULE = {} @staticmethod def init_app(app): pass class DevelopmentConfig(Config): DEBUG = True WTF_CSRF_ENABLED = True APP_HOME = '' SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...' CELERY_BROKER_URL = 'sqla+mysql://...' CELERY_RESULT_BACKEND = "database" CELERY_RESULT_DBURI = 'mysql://...' CELERY_TIMEZONE = 'Europe/Kiev' CELERY_ENABLE_UTC = False CELERYBEAT_SCHEDULE = { 'send-email-every-morning': { 'task': 'app.workers.tasks.send_email_task', 'schedule': crontab(hour=6, minute=15), }, } class TestConfig(Config): DEBUG = True WTF_CSRF_ENABLED = False TESTING = True SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...' class ProdConfig(Config): DEBUG = False WTF_CSRF_ENABLED = True SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...' CELERY_BROKER_URL = 'sqla+mysql://...celery' CELERY_RESULT_BACKEND = "database" CELERY_RESULT_DBURI = 'mysql://.../celery' CELERY_TIMEZONE = 'Europe/Kiev' CELERY_ENABLE_UTC = False CELERYBEAT_SCHEDULE = { 'send-email-every-morning': { 'task': 'app.workers.tasks.send_email_task', 'schedule': crontab(hour=6, minute=15), }, } config = { 'development': DevelopmentConfig, 'default': ProdConfig, 'production': ProdConfig, 'testing': TestConfig, } class AppConf: """ Class to store current config even out of context """ def __init__(self): self.app = None self.config = {} def init_app(self, app): if hasattr(app, 'config'): self.app = app self.config = app.config.copy() else: raise TypeError
INIT .py: import os
from flask import Flask from celery import Celery from config import config, AppConf def create_app(config_name): app = Flask(__name__) app.config.from_object(config[config_name]) config[config_name].init_app(app) app_conf.init_app(app)
tasks.py: from application import make_celery, app_conf
cel = make_celery(app_conf.app) @cel.task def send_realm_to_fabricdb(realm, form): some actions...
and hereβs the problem: The Blueprint project uses the send_realm_to_fabricdb task, so it does: from tasks import send_realm_to_fabricdb than when I just start the application, everything goes fine BUT when I try to start celery: celery -A app.tasks worker -l info --beat , it goes to cel = make_celery(app_conf.app) in tasks.py, got the application = No and tries to create the application again: registering the project ... so I have import loop Here. Could you tell me how to break this cycle? Thanks in advance.
import flask celery
user3319628
source share