Retrying create python App Engine pipeline

I have a client pipeline that needs to send a request to an external computing server and then get the results. The calculation takes some time, so the external server processes it in the task queue. The client conveyor is not quite sure when the results will be ready; instead, it should poll the server (get_status (id)), and then, if status == Completed, get the results (get_results (id))

The problem is that the server crashes from time to time, in which case I need the client pipeline to try again. I have a pipeline template based on this post:

Google AppEngine Pipelines APIs

which is as follows:

class ReducePipeline(pipeline.Pipeline): def run(self, *args): results=[result for result in list(args) if result] if results==[]: return None return results[0] class CallbackPipeline(pipeline.Pipeline): def run(self, id): status=get_status(id) # call external server status results, future_results = None, None if status==Error: pass elif status==Completed: results=get_results(id) # get results from external server elif status!=Completed: with pipeline.InOrder(): yield Delay(seconds=CallbackWait) # NB future_results=yield CallbackPipeline(id) yield ReducePipeline(results, future_results) class StartPipeline(pipeline.Pipeline): def run(self, request): id=start(request) # post request to external server; get job id in return yield CallbackPipeline(id) """ is this really the best way to retry the pipeline ? dev_appserver.py results don't look promising :-( """ def finalized(self): if not self.outputs.default: raise pipeline.Retry() 

but this does not work on dev_appserver.py, just causing repeated errors at the completion stage of StartPipeline. I was hoping to return all StartPipeline for a restart.

Can someone advise me on a reasonable sample for retrying to start StartPipeline when getting None results?

Thanks.

+1
source share
1 answer

You will most likely need a 4th pipeline that interprets, validates and confirms that the CallbackPipeline results are what you need, and you will need to raise the Retry from run () of this new pipeline.

You can also use recursion in this 4th pipeline against raising Retry. You will just call CallbackPipeline until you reach the results you were looking for. This needs to be done asynchronously, for example using Anentropic.

+2
source

All Articles