I assign a job identifier to each chain, and I track this job by storing data in a database.
Queue Launch
if __name__ == "__main__":
Now tasks
I am using entity_manager to manage database operations. My object manager uses sql alchemy and mysql. I also used the result table to store partial results. This part should be modified for your best storage system (or use it if mysql is right for you)
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine import inject class EntityManager(): session = None @inject.params(config=Configuration) def __init__(self, config): conf = config['persistence'] uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database'] engine = create_engine(uri, echo=conf['debug']) Session = sessionmaker(bind=engine) self.session = Session() def query(self, entity_type): return self.session.query(entity_type) def add(self, entity): return self.session.add(entity) def flush(self): return self.session.flush() def commit(self): return self.session.commit() class Configuration: def __init__(self, params): f = open(os.environ.get('PYTHONPATH') + '/conf/config.yml') self.configMap = yaml.safe_load(f) f.close() def __getitem__(self, key: str): return self.configMap[key] class Result(Base): __tablename__ = 'result' id = Column(Integer, primary_key=True) job_id = Column(String(255)) level = Column(Integer) result = Column(Integer) def __repr__(self): return "<Result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result))
I used package injection to get a dependency injector. The injection package will reuse the object so that you can enter access to the database every time and not worry about the connection.
A class configuration is loading database access data in a configuration file. You can replace it and use static data (hard-coded cards) for testing.
Change the dependency injection for any other thing suitable for you. This is only my decision. I just added it for a quick test.
The key here is to save partial results somewhere in our queue system, and in tasks data is returned to access these results (job_id and parent level). You will send additional (but small) data that is an address (job_id + parent level) that points to real data (some big things).
This is the solution that I use in my software.
source share