Celery: the right way to run a lengthy initialization function (per process)

TL; DR;

To run the initialization function for each process that is generated by the celery, you can use the worker_process_init signal. As you can read in the docs , handlers for this signal should not be blocked for more than 4 seconds. But what are the parameters if I need to run the init function, which takes more than 4 seconds to complete?

Problem

I use the C extension module to perform certain operations in celery tasks. This module requires initialization, which can take several seconds (maybe 4-10). Since I prefer not to run this init function for every task, but for every process that was spawned, I used the worker_process_init signal:

 #lib.py import isclient #c extension module client = None def init(): global client client = isclient.Client() #this might take a while def create_ne_list(text): return client.ne_receiventities4datachunk(text) #celery.py from celery import Celery from celery.signals import worker_process_init from lib import init celery = Celery(include=[ 'isc.ne.tasks' ]) celery.config_from_object('celeryconfig') @worker_process_init.connect def process_init(sender=None, conf=None, **kwargs): init() if __name__ == '__main__': celery.start() #tasks.py from celery import celery from lib import create_ne_list as cnl @celery.task(time_limit=1200) def create_ne_list(text): return cnl(text) 

What happens when I run this code is what I described in my previous question ( Celery: stuck in endlessly repeating timeouts (UP message timeout) ) In short: since my init function takes more than 4 seconds, sometimes it happens, that the worker kills and restarts, and during the restart is killed again, because this happens automatically after 4 seconds of irresponsibility. This ultimately leads to an endlessly repeating kill-and-restart process.

Another option would be to run my init function only once for each worker using the worker_init signal. If I do this, I will have another problem: now freezing processes take place in turn. When I start working with concurrency from 3, and then send a couple of tasks, the first three will be finished, the rest will not be affected. (I suppose this may have something to do with the fact that client objects must be shared between several processes and that the C extension for some reason does not support this. But, to be honest, I'm relatively new to mule processing, so I I can just guess)

Question

So the question remains: how can I run the init function for each process that takes more than 4 seconds? Is there a right way to do this and how will it be?

+8
python initialization multiprocessing python-c-extension celery
source share
1 answer

Celery restrictions for processing initialization timeouts of up to 4.0 seconds. Check source code

To get around this limit, you can consider it before creating a celery application.

 from celery.concurrency import asynpool asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough 

Please note that there are no settings or settings to change this value.

+6
source share

All Articles