How to end a process using python multiprocessing

I have code that needs to be run against several other systems that may freeze or have problems that are not under my control. I would like to use python multiprocessing to start the child processes to start regardless of the main program, and then when they freeze or they run out of problems, but I'm not sure about the best way to do this.

When terminate is called, it kills the child process, but then it becomes a non-existent zombie that is not freed until the process object disappears. The code example below, where the loop never ends, works to kill it and allow respawn when called again, but it doesn’t seem to be a good way around this (i.e.. Multiprocessing .Process () would be better in __init __ ()).

Anyone have a suggestion?

class Process(object): def __init__(self): self.thing = Thing() self.running_flag = multiprocessing.Value("i", 1) def run(self): self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,)) self.process.start() print self.process.pid def pause_resume(self): self.running_flag.value = not self.running_flag.value def terminate(self): self.process.terminate() class Thing(object): def __init__(self): self.count = 1 def worker(self,running_flag): while True: if running_flag.value: self.do_work() def do_work(self): print "working {0} ...".format(self.count) self.count += 1 time.sleep(1) 
+7
python multiprocessing python-multiprocessing
source share
3 answers

The way Python multiprocessing handles processes is a bit confusing.

From the recommendations for multiprocessing:

Combining zombie processes

On Unix, when a process ends, but has not been attached, it becomes a zombie. There should never be too many of them, because every time a new process starts (or the active_children () function is called), all completed processes that have not yet been combined will be combined. Also, the process call ends with the Process.is_alive process. However, this is probably a good practice for explicitly combining all the processes that you start.

To avoid the process of turning into a zombie, you need to call the join() method as soon as you kill it.

If you need an easier way to handle hanging calls on your system, you can take a look at pebble .

+4
source share

You can run child processes as daemons in the background.

 process.daemon = True 

Any errors and freezes (or an endless loop) during the daemon will not affect the main process, and it will be completed only after the main process exits.

This will work for simple tasks until you come across a lot of child daemon processes that will keep getting memories from the parent process without any explicit control.

The best way is to set Queue so that all child processes interact with the parent process so that we can join them and clean them up. Here is a simple code that checks if time.sleep(1000) processing is hanging (aka time.sleep(1000) ) and sends a message to the queue so that the main process takes action on it:

 import multiprocessing as mp import time import queue running_flag = mp.Value("i", 1) def worker(running_flag, q): count = 1 while True: if running_flag.value: print "working {0} ...".format(count) count += 1 q.put(count) time.sleep(1) if count > 3: # Simulate hanging with sleep print "hanging..." time.sleep(1000) def watchdog(q): """ This check the queue for updates and send a signal to it when the child process isn't sending anything for too long """ while True: try: msg = q.get(timeout=10.0) except queue.Empty as e: print "[WATCHDOG]: Maybe WORKER is slacking" q.put("KILL WORKER") def main(): """The main process""" q = mp.Queue() workr = mp.Process(target=worker, args=(running_flag, q)) wdog = mp.Process(target=watchdog, args=(q,)) # run the watchdog as daemon so it terminates with the main process wdog.daemon = True workr.start() print "[MAIN]: starting process P1" wdog.start() # Poll the queue while True: msg = q.get() if msg == "KILL WATCHDOG": print "[MAIN]: Terminating slacking WORKER" workr.terminate() time.sleep(0.1) if not workr.is_alive(): print "[MAIN]: WORKER is a goner" workr.join(timeout=1.0) print "[MAIN]: Joined WORKER successfully!" q.close() break # watchdog process daemon gets terminated if __name__ == '__main__': main() 

Without worker terminating, an attempt to join() it to the main process is blocked forever, since worker never ends.

+3
source share

(Not having a sufficient number of reputation points for comments, thereby giving a complete answer)

@PieOhPah: Thank you for this very good example.
Unfortunately, there is only one small flaw that prevents the watchdog from killing the worker:

 if msg == "KILL WATCHDOG": 

it should be:

 if msg == "KILL WORKER": 

Thus, the code becomes (with updated print for python3):

 import multiprocessing as mp import time import queue running_flag = mp.Value("i", 1) def worker(running_flag, q): count = 1 while True: if running_flag.value: print ("working {0} ...".format(count)) count += 1 q.put(count) time.sleep(1) if count > 3: # Simulate hanging with sleep print ("hanging...") time.sleep(1000) def watchdog(q): """ This check the queue for updates and send a signal to it when the child process isn't sending anything for too long """ while True: try: msg = q.get(timeout=10.0) except queue.Empty as e: print ("[WATCHDOG]: Maybe WORKER is slacking") q.put("KILL WORKER") def main(): """The main process""" q = mp.Queue() workr = mp.Process(target=worker, args=(running_flag, q)) wdog = mp.Process(target=watchdog, args=(q,)) # run the watchdog as daemon so it terminates with the main process wdog.daemon = True workr.start() print ("[MAIN]: starting process P1") wdog.start() # Poll the queue while True: msg = q.get() # if msg == "KILL WATCHDOG": if msg == "KILL WORKER": print ("[MAIN]: Terminating slacking WORKER") workr.terminate() time.sleep(0.1) if not workr.is_alive(): print ("[MAIN]: WORKER is a goner") workr.join(timeout=1.0) print ("[MAIN]: Joined WORKER successfully!") q.close() break # watchdog process daemon gets terminated if __name__ == '__main__': main() 
+2
source share

All Articles