Python multiprocessing pool ends

I am working on renderfarm and I need my clients to be able to run multiple instances of the renderer without blocking so that the client can receive new commands. This works for me correctly, but I am having problems completing the created processes.

At the global level, I define my pool (so that I can access it from any function):

p = Pool(2) 

Then I invoke my renderer with apply_async:

 for i in range(totalInstances): p.apply_async(render, (allRenderArgs[i],args[2]), callback=renderFinished) p.close() 

This function ends, starts processes in the background and waits for new commands. I made a simple command that will kill the client and stop rendering:

 def close(): 'close this client instance' tn.write ("say "+USER+" is leaving the farm\r\n") try: p.terminate() except Exception,e: print str(e) sys.exit() sys.exit() 

It does not seem to give an error (it will throw an error), python is terminating, but the background processes are still working. Can anyone recommend a better way to manage these running programs?

+10
python multiprocessing pool
source share
4 answers

Found the answer to my own question. The main problem was that I was calling a third-party application, not a function. When I call a subprocess [using the call method () or Popen ()], it creates a new python instance whose sole purpose is to call a new application. However, when python exits, it will kill this new python instance and leave the application running.

The solution is to make this the hard way by finding the pid of the python process that is created by getting the children from that pid and killing them. This code is specific to osx; There is simple code (which does not rely on grep) available for Linux.

 for process in pool: processId = process.pid print "attempting to terminate "+str(processId) command = " ps -o pid,ppid -ax | grep "+str(processId)+" | cut -f 1 -d \" \" | tail -1" ps_command = Popen(command, shell=True, stdout=PIPE) ps_output = ps_command.stdout.read() retcode = ps_command.wait() assert retcode == 0, "ps command returned %d" % retcode print "child process pid: "+ str(ps_output) os.kill(int(ps_output), signal.SIGTERM) os.kill(int(processId), signal.SIGTERM) 
-5
source share

I found a solution: stop the pool in a separate thread, for example:

 def close_pool(): global pool pool.close() pool.terminate() pool.join() def term(*args,**kwargs): sys.stderr.write('\nStopping...') # httpd.shutdown() stophttp = threading.Thread(target=httpd.shutdown) stophttp.start() stoppool=threading.Thread(target=close_pool) stoppool.daemon=True stoppool.start() signal.signal(signal.SIGTERM, term) signal.signal(signal.SIGINT, term) signal.signal(signal.SIGQUIT, term) 

It works fine and is always checked.

+7
source share

If you are still experiencing this problem, you can try to simulate a Pool with demo processes (assuming that you are starting the pool / processes from a non-demon process). I doubt this is the best solution, as it seems that your Pool processes should be completed, but that’s all I could think of. I don't know what your callback does, so I'm not sure where to place it in my example below.

I also suggest trying to create a Pool in __main__ because of my experience (and documents) with the unusualness that occurs when processes are generated globally. This is especially true if you are on Windows: http://docs.python.org/2/library/multiprocessing.html#windows

 from multiprocessing import Process, JoinableQueue # the function for each process in our pool def pool_func(q): while True: allRenderArg, otherArg = q.get() # blocks until the queue has an item try: render(allRenderArg, otherArg) finally: q.task_done() # best practice to go through main for multiprocessing if __name__=='__main__': # create the pool pool_size = 2 pool = [] q = JoinableQueue() for x in range(pool_size): pool.append(Process(target=pool_func, args=(q,))) # start the pool, making it "daemonic" (the pool should exit when this proc exits) for p in pool: p.daemon = True p.start() # submit jobs to the queue for i in range(totalInstances): q.put((allRenderArgs[i], args[2])) # wait for all tasks to complete, then exit q.join() 
+5
source share
 # -*- coding:utf-8 -*- import multiprocessing import time import sys import threading from functools import partial #> work func def f(a,b,c,d,e): print('start') time.sleep(4) print(a,b,c,d,e) ###########> subProcess func #1. start a thead for work func #2. waiting thead with a timeout #3. exit the subProcess ########### def mulPro(f, *args, **kwargs): timeout = kwargs.get('timeout',None) #1. t = threading.Thread(target=f, args=args) t.setDaemon(True) t.start() #2. t.join(timeout) #3. sys.exit() if __name__ == "__main__": p = multiprocessing.Pool(5) for i in range(5): #1. process the work func with "subProcess func" new_f = partial(mulPro, f, timeout=8) #2. fire on p.apply_async(new_f, args=(1,2,3,4,5),) # p.apply_async(f, args=(1,2,3,4,5), timeout=2) for i in range(10): time.sleep(1) print(i+1,"s") p.close() # p.join() 
0
source share

All Articles