Celery: access to all previous results in the chain

So basically I have a rather complicated workflow that looks something like this:

>>> res = (add.si(2, 2) | add.s(4) | add.s(8))() >>> res.get() 16 

Subsequently, it is pretty trivial for me to go up to the results chain and collect all the individual results:

 >>> res.parent.get() 8 >>> res.parent.parent.get() 4 

My problem is that if my third task depends on knowing the result of the first, but in this example only gets the result of the second?

The chains are also quite long, and the results are not so small, so simply passing through the entrance as a result will unnecessarily pollute the results store. What is Redis, so the restrictions when using RabbitMQ, ZeroMQ, ... do not apply.

+5
source share
3 answers

I assign a job identifier to each chain, and I track this job by storing data in a database.

Queue Launch

 if __name__ == "__main__": # Generate unique id for the job job_id = uuid.uuid4().hex # This is the root parent parent_level = 1 # Pack the data. The last value is your value to add parameters = job_id, parent_level, 2 # Build the chain. I added an clean task that removes the data # created during the process (if you want it) add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s() add_chain.apply_async() 

Now tasks

 #Function for store the result. I used sqlalchemy (mysql) but you can # change it for whatever you want (distributed file system for example) @inject.params(entity_manager=EntityManager) def save_result(job_id, level, result, entity_manager): r = Result() r.job_id = job_id r.level = level r.result = result entity_manager.add(r) entity_manager.commit() #Restore a result from one parent @inject.params(entity_manager=EntityManager) def get_result(job_id, level, entity_manager): result = entity_manager.query(Result).filter_by(job_id=job_id, level=level).one() return result.result #Clear the data or do something with the final result @inject.params(entity_manager=EntityManager) def clear(job_id, entity_manager): entity_manager.query(Result).filter_by(job_id=job_id).delete() @app.task() def add(parameters, number): # Extract data from parameters list job_id, level, other_number = parameters #Load result from your second parent (level - 2) #For level 3 parent level - 3 and so on #second_parent_result = get_result(job_id, level - 2) # do your stuff, I guess you want to add numbers result = number + other_number save_result(job_id, level, result) #Return the result of the sum or anything you want, but you have to send something because the "add" function expects 3 values #Of course your should return the actual job and increment the parent level return job_id, level + 1, result @app.task() def clean(parameters): job_id, level, result = parameters #Do something with final result or not #Clear the data clear(job_id) 

I am using entity_manager to manage database operations. My object manager uses sql alchemy and mysql. I also used the result table to store partial results. This part should be modified for your best storage system (or use it if mysql is right for you)

 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine import inject class EntityManager(): session = None @inject.params(config=Configuration) def __init__(self, config): conf = config['persistence'] uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database'] engine = create_engine(uri, echo=conf['debug']) Session = sessionmaker(bind=engine) self.session = Session() def query(self, entity_type): return self.session.query(entity_type) def add(self, entity): return self.session.add(entity) def flush(self): return self.session.flush() def commit(self): return self.session.commit() class Configuration: def __init__(self, params): f = open(os.environ.get('PYTHONPATH') + '/conf/config.yml') self.configMap = yaml.safe_load(f) f.close() def __getitem__(self, key: str): return self.configMap[key] class Result(Base): __tablename__ = 'result' id = Column(Integer, primary_key=True) job_id = Column(String(255)) level = Column(Integer) result = Column(Integer) def __repr__(self): return "<Result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result)) 

I used package injection to get a dependency injector. The injection package will reuse the object so that you can enter access to the database every time and not worry about the connection.

A class configuration is loading database access data in a configuration file. You can replace it and use static data (hard-coded cards) for testing.

Change the dependency injection for any other thing suitable for you. This is only my decision. I just added it for a quick test.

The key here is to save partial results somewhere in our queue system, and in tasks data is returned to access these results (job_id and parent level). You will send additional (but small) data that is an address (job_id + parent level) that points to real data (some big things).

This is the solution that I use in my software.

+2
source

A simple job is to store the results of tasks in a list and use them in your tasks.

 from celery import Celery, chain from celery.signals import task_success results = [] app = Celery('tasks', backend='amqp', broker='amqp://') @task_success.connect() def store_result(**kwargs): sender = kwargs.pop('sender') result = kwargs.pop('result') results.append((sender.name, result)) @app.task def add(x, y): print("previous results", results) return x + y 

Now in your chain all the previous results can be obtained from any task in any order.

+1
source

Your setup may be too complicated for this, but I like to use group in conjunction with the noop task to achieve something similar. I do it this way because I want to highlight areas that are still synchronous in my pipeline (they can usually be deleted).

Using something similar to your example, I start with a set of tasks that look like this:

tasks.py :

 from celery import Celery app = Celery('tasks', backend="redis", broker='redis://localhost') @app.task def add(x, y): return x + y @app.task def xsum(elements): return sum(elements) @app.task def noop(ignored): return ignored 

With these tasks, I create a chain using a group to manage results that depend on synchronous results:

 In [1]: from tasks import add,xsum,noop In [2]: from celery import group # First I run the task which I need the value of later, then I send that result to a group where the first task does nothing and the other tasks are my pipeline. In [3]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8))) Out[3]: [4, 16] # At this point I have a list where the first element is the result of my original task and the second element has the result of my workflow. In [4]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s()) Out[4]: 20 # From here, things can go back to a normal chain In [5]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s() | add.s(1) | add.s(1)) Out[5]: 22 

Hope this is helpful!

+1
source

All Articles