Celery: demonic processes are not allowed to have children

In Python (2.7), I am trying to create processes (with multiprocessing) in the celery task (celery 3.1.17), but it gives an error:

daemonic processes are not allowed to have children 

As a result, I found that the latest versions of billiards correct the β€œerror”, but I have the latest version (3.3.0.20), and the error still occurs. I also tried to implement this workaround in my celery task, but it gives the same error.

Does anyone know how to do this? Any help is appreciated, Patrick

EDIT: code snippets

Task:

 from __future__ import absolute_import from celery import shared_task from embedder.models import Embedder @shared_task def embedder_update_task(embedder_id): embedder = Embedder.objects.get(pk=embedder_id) embedder.test() 

Artificial Testing Function ( from here ):

 def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): print("Creating %i (daemon) workers and jobs in child." % num_procs) pool = mp.Pool(num_procs) result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) # The following is not really needed, since the (daemon) workers of the # child pool are killed when the child is terminated, but it good # practice to cleanup after ourselves anyway. pool.close() pool.join() return result def test(self): print("Creating 5 (non-daemon) workers and jobs in main process.") pool = MyPool(5) result = pool.map(work, [randint(1, 5) for x in range(5)]) pool.close() pool.join() print(result) 

My actual function is:

 import mulitprocessing as mp def test(self): self.init() for saveindex in range(self.start_index,self.start_index+self.nsaves): self.create_storage(saveindex) # process creation: procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)] for p in procs: p.start() for p in procs: p.join() print "End of task" 

The init function defines a multiprocessor array and an object that uses the same memory so that all my processes can update the same array at the same time:

 mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V 

Error when calling a task:

 [2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda] [2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process. [2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda] raised unexpected: AssertionError('daemonic processes are not allowed to have children',) Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task embedder.test() File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test pool = MyPool(5) File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__ self._repopulate_pool() File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool w.start() File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children 
+7
python daemon python-multiprocessing celery
source share
2 answers

billiard and multiprocessing are different libraries - billiard is a celery project that is used for multiprocessing . You will need to import billiard and use it instead of multiprocessing

However, the best answer is probably that you should reorganize your code so that you run more Celery tasks instead of using two different ways to distribute your work.

You can do this using Celery Canvas.

 from celery import group @app.task def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)]) def test(self): my_group = group(work(randint(1, 5)) for x in range(5)) result = my_group.apply_async() result.get() 

I tried to create a working version of your code using canvas primitives instead of multiprocessing. However, since your example was rather artificial, it is not easy to come up with something that makes sense.

Update:

Here is a translation of your real code that uses Celery canvas:

tasks.py :

 @shared_task run_training_method(saveindex, embedder_id): embedder = Embedder.objects.get(pk=embedder_id) embedder.training_method(saveindex) 

models.py :

 from tasks import run_training_method from celery import group class Embedder(Model): def embedder_update_task(self): my_group = [] for saveindex in range(self.start_index, self.start_index + self.nsaves): self.create_storage(saveindex) # Add to list my_group.extend([run_training_method.subtask((saveindex, self.id)) for i in range(self.nproc)]) result = group(my_group).apply_async() 
+5
source share

I got this when I use multiprocessing with Celery 4.2.0 and Python3.6. I decided this with the help of billiards.

I changed the source code from

from multiprocessing import Process

in

from billiard.context import Process

solved this error.

Attention, the import source is billiard.context not billiard.process

0
source share

All Articles