Python Multiprocessing - AssertionError: Can only join a child process

I take my first foray into the mutliprocessing processing module in python and I am having some problems. I am very familiar with the streaming module, but I need to make sure that the processes that I execute are running in parallel.

Here is a diagram of what I'm trying to do. Please ignore things like undeclared variables / functions, because I cannot completely insert my code.

import multiprocessing import time def wrap_func_to_run(host, args, output): output.append(do_something(host, args)) return def func_to_run(host, args): return do_something(host, args) def do_work(server, client, server_args, client_args): server_output = func_to_run(server, server_args) client_output = func_to_run(client, client_args) #handle this output and return a result return result def run_server_client(server, client, server_args, client_args, server_output, client_output): server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output)) server_process.start() client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output)) client_process.start() server_process.join() client_process.join() #handle the output and return some result def run_in_parallel(server, client): #set up commands for first process server_output = client_output = [] server_cmd = "cmd" client_cmd = "cmd" process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output)) process_one.start() #set up second process to run - but this one can run here result = do_work(server, client, "some server args", "some client args") process_one.join() #use outputs above and the result to determine result return final_result def main(): #grab client client = client() #grab server server = server() return run_in_parallel(server, client) if __name__ == "__main__": main() 

Here is the error I get:

 Error in sys.exitfunc: Traceback (most recent call last): File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs func(*targs, **kargs) File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function p.join() File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join assert self._parent_pid == os.getpid(), 'can only join a child process' AssertionError: can only join a child process 

I tried many different things to fix this, but I feel that something is wrong with the way I use this module.

EDIT:

So, I created a file that will play it, simulating the client / server and the work that they do. Also, I missed the important point, which was that I ran this on unix. Another important piece of information was that do_work in my actual case involves using os.fork() . I could not reproduce the error without using os.fork() , so I assume that there is a problem. In my real case, this part of the code was not mine, so I saw it as a black box (probably an error on my part). Anyways here is the code to play -

 #!/usr/bin/python import multiprocessing import time import os import signal import sys class Host(): def __init__(self): self.name = "host" def work(self): #override - use to simulate work pass class Server(Host): def __init__(self): self.name = "server" def work(self): x = 0 for i in range(10000): x+=1 print x time.sleep(1) class Client(Host): def __init__(self): self.name = "client" def work(self): x = 0 for i in range(5000): x+=1 print x time.sleep(1) def func_to_run(host, args): print host.name + " is working" host.work() print host.name + ": " + args return "done" def do_work(server, client, server_args, client_args): print "in do_work" server_output = client_output = "" child_pid = os.fork() if child_pid == 0: server_output = func_to_run(server, server_args) sys.exit(server_output) time.sleep(1) client_output = func_to_run(client, client_args) # kill and wait for server to finish os.kill(child_pid, signal.SIGTERM) (pid, status) = os.waitpid(child_pid, 0) return (server_output == "done" and client_output =="done") def run_server_client(server, client, server_args, client_args): server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args)) print "Starting server process" server_process.start() client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args)) print "Starting client process" client_process.start() print "joining processes" server_process.join() client_process.join() print "processes joined and done" def run_in_parallel(server, client): #set up commands for first process server_cmd = "server command for run_server_client" client_cmd = "client command for run_server_client" process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd)) print "Starting process one" process_one.start() #set up second process to run - but this one can run here print "About to do work" result = do_work(server, client, "server args from do work", "client args from do work") print "Joining process one" process_one.join() #use outputs above and the result to determine result print "Process one has joined" return result def main(): #grab client client = Client() #grab server server = Server() return run_in_parallel(server, client) if __name__ == "__main__": main() 

If I remove the use of os.fork() in do_work , I will not get an error, and the code will behave as I expected before (except for passing the results, which I accepted as my mistake / misunderstanding). I can change the old code so as not to use os.fork (), but I would also like to know why this caused this problem and whether there was a workable solution.

EDIT 2:

I started working on a solution that omits os.fork () before the accepted answer. Here is what I have with some tweaking on the amount of simulated work that can be done -

 #!/usr/bin/python import multiprocessing import time import os import signal import sys from Queue import Empty class Host(): def __init__(self): self.name = "host" def work(self, w): #override - use to simulate work pass class Server(Host): def __init__(self): self.name = "server" def work(self, w): x = 0 for i in range(w): x+=1 print x time.sleep(1) class Client(Host): def __init__(self): self.name = "client" def work(self, w): x = 0 for i in range(w): x+=1 print x time.sleep(1) def func_to_run(host, args, w, q): print host.name + " is working" host.work(w) print host.name + ": " + args q.put("ZERO") return "done" def handle_queue(queue): done = False results = [] return_val = 0 while not done: #try to grab item from Queue tr = None try: tr = queue.get_nowait() print "found element in queue" print tr except Empty: done = True if tr is not None: results.append(tr) for el in results: if el != "ZERO": return_val = 1 return return_val def do_work(server, client, server_args, client_args): print "in do_work" server_output = client_output = "" child_pid = os.fork() if child_pid == 0: server_output = func_to_run(server, server_args) sys.exit(server_output) time.sleep(1) client_output = func_to_run(client, client_args) # kill and wait for server to finish os.kill(child_pid, signal.SIGTERM) (pid, status) = os.waitpid(child_pid, 0) return (server_output == "done" and client_output =="done") def run_server_client(server, client, server_args, client_args, w, mq): local_queue = multiprocessing.Queue() server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue)) print "Starting server process" server_process.start() client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue)) print "Starting client process" client_process.start() print "joining processes" server_process.join() client_process.join() print "processes joined and done" if handle_queue(local_queue) == 0: mq.put("ZERO") def run_in_parallel(server, client): #set up commands for first process master_queue = multiprocessing.Queue() server_cmd = "server command for run_server_client" client_cmd = "client command for run_server_client" process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue)) print "Starting process one" process_one.start() #set up second process to run - but this one can run here print "About to do work" #result = do_work(server, client, "server args from do work", "client args from do work") run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue) print "Joining process one" process_one.join() #use outputs above and the result to determine result print "Process one has joined" return_val = handle_queue(master_queue) print return_val return return_val def main(): #grab client client = Client() #grab server server = Server() val = run_in_parallel(server, client) if val: print "failed" else: print "passed" return val if __name__ == "__main__": main() 

This code has some modified printouts to see exactly what is happening. I used multiprocessing.Queue to store and share outputs across all processes and back to my main thread for processing. I think this solves the python part of my problem, but there are still some problems in the code I'm working on. The only thing I can say is that the equivalent of func_to_run includes sending the command via ssh and capturing any err along with the output. For some reason, this works great for a team with a low runtime, but not very good for a team with a much longer runtime / exit. I tried to simulate this using completely different working values โ€‹โ€‹in my code here, but could not reproduce similar results.

EDIT 3 The library code I use (again not mine) uses Popen.wait() for ssh commands, and I just read this:

Popen.wait() Wait for the child process to complete. Set and return the returncode attribute.

A warning. This will be inhibited when using stdout = PIPE and / or stderr = PIPE, and the child process generates sufficient output to the channel so that it blocks the wait> buffer of the OS buffer to receive more data. Use the communication method () to avoid this.

I adjusted the code so as not to buffer and just print, as it is received, and everything works.

+5
source share
2 answers

I can change the old code so as not to use os.fork() , but I would also like to know why this caused this problem and whether there was a workable solution.

The key to understanding the problem is knowing what fork() does. CPython docs say "Insert child process." but this assumes that you understand the C fork() library call.

Here is what glibc manpage says:

fork() creates a new process, duplicating the calling process. The new process, called the child, is an exact duplication of the calling process, called the parent, with the following exceptions: ...

Basically, as if you took your program and made a copy of the state of your program (heap, stack, instruction pointer, etc.) with slight differences and allowed it to execute independent of the original. When this child process exits naturally, it will use exit() , and this will call the atexit() handlers registered by the multiprocessing module.

What can you do to avoid this?

  • omit os.fork() : use multiprocessing instead, as you are currently studying
  • probably effective: import multiprocessing after executing fork() , only in case of child or parent if necessary.
  • use _exit() in the child device (CPython docs state, "Note. The standard way out is sys.exit (n). _exit () should usually only be used in the child process after fork ().")

https://docs.python.org/2/library/os.html#os._exit

+3
source

It seems to me that you strung it once too much. I would not thread it from run_in_parallel , but simply call run_server_client with the corresponding arguments, because they will be inside.

0
source

All Articles