I take my first foray into the mutliprocessing processing module in python and I am having some problems. I am very familiar with the streaming module, but I need to make sure that the processes that I execute are running in parallel.
Here is a diagram of what I'm trying to do. Please ignore things like undeclared variables / functions, because I cannot completely insert my code.
import multiprocessing import time def wrap_func_to_run(host, args, output): output.append(do_something(host, args)) return def func_to_run(host, args): return do_something(host, args) def do_work(server, client, server_args, client_args): server_output = func_to_run(server, server_args) client_output = func_to_run(client, client_args)
Here is the error I get:
Error in sys.exitfunc: Traceback (most recent call last): File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs func(*targs, **kargs) File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function p.join() File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join assert self._parent_pid == os.getpid(), 'can only join a child process' AssertionError: can only join a child process
I tried many different things to fix this, but I feel that something is wrong with the way I use this module.
EDIT:
So, I created a file that will play it, simulating the client / server and the work that they do. Also, I missed the important point, which was that I ran this on unix. Another important piece of information was that do_work in my actual case involves using os.fork() . I could not reproduce the error without using os.fork() , so I assume that there is a problem. In my real case, this part of the code was not mine, so I saw it as a black box (probably an error on my part). Anyways here is the code to play -
#!/usr/bin/python import multiprocessing import time import os import signal import sys class Host(): def __init__(self): self.name = "host" def work(self): #override - use to simulate work pass class Server(Host): def __init__(self): self.name = "server" def work(self): x = 0 for i in range(10000): x+=1 print x time.sleep(1) class Client(Host): def __init__(self): self.name = "client" def work(self): x = 0 for i in range(5000): x+=1 print x time.sleep(1) def func_to_run(host, args): print host.name + " is working" host.work() print host.name + ": " + args return "done" def do_work(server, client, server_args, client_args): print "in do_work" server_output = client_output = "" child_pid = os.fork() if child_pid == 0: server_output = func_to_run(server, server_args) sys.exit(server_output) time.sleep(1) client_output = func_to_run(client, client_args) # kill and wait for server to finish os.kill(child_pid, signal.SIGTERM) (pid, status) = os.waitpid(child_pid, 0) return (server_output == "done" and client_output =="done") def run_server_client(server, client, server_args, client_args): server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args)) print "Starting server process" server_process.start() client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args)) print "Starting client process" client_process.start() print "joining processes" server_process.join() client_process.join() print "processes joined and done" def run_in_parallel(server, client): #set up commands for first process server_cmd = "server command for run_server_client" client_cmd = "client command for run_server_client" process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd)) print "Starting process one" process_one.start() #set up second process to run - but this one can run here print "About to do work" result = do_work(server, client, "server args from do work", "client args from do work") print "Joining process one" process_one.join() #use outputs above and the result to determine result print "Process one has joined" return result def main(): #grab client client = Client() #grab server server = Server() return run_in_parallel(server, client) if __name__ == "__main__": main()
If I remove the use of os.fork() in do_work , I will not get an error, and the code will behave as I expected before (except for passing the results, which I accepted as my mistake / misunderstanding). I can change the old code so as not to use os.fork (), but I would also like to know why this caused this problem and whether there was a workable solution.
EDIT 2:
I started working on a solution that omits os.fork () before the accepted answer. Here is what I have with some tweaking on the amount of simulated work that can be done -
#!/usr/bin/python import multiprocessing import time import os import signal import sys from Queue import Empty class Host(): def __init__(self): self.name = "host" def work(self, w): #override - use to simulate work pass class Server(Host): def __init__(self): self.name = "server" def work(self, w): x = 0 for i in range(w): x+=1 print x time.sleep(1) class Client(Host): def __init__(self): self.name = "client" def work(self, w): x = 0 for i in range(w): x+=1 print x time.sleep(1) def func_to_run(host, args, w, q): print host.name + " is working" host.work(w) print host.name + ": " + args q.put("ZERO") return "done" def handle_queue(queue): done = False results = [] return_val = 0 while not done: #try to grab item from Queue tr = None try: tr = queue.get_nowait() print "found element in queue" print tr except Empty: done = True if tr is not None: results.append(tr) for el in results: if el != "ZERO": return_val = 1 return return_val def do_work(server, client, server_args, client_args): print "in do_work" server_output = client_output = "" child_pid = os.fork() if child_pid == 0: server_output = func_to_run(server, server_args) sys.exit(server_output) time.sleep(1) client_output = func_to_run(client, client_args) # kill and wait for server to finish os.kill(child_pid, signal.SIGTERM) (pid, status) = os.waitpid(child_pid, 0) return (server_output == "done" and client_output =="done") def run_server_client(server, client, server_args, client_args, w, mq): local_queue = multiprocessing.Queue() server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue)) print "Starting server process" server_process.start() client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue)) print "Starting client process" client_process.start() print "joining processes" server_process.join() client_process.join() print "processes joined and done" if handle_queue(local_queue) == 0: mq.put("ZERO") def run_in_parallel(server, client): #set up commands for first process master_queue = multiprocessing.Queue() server_cmd = "server command for run_server_client" client_cmd = "client command for run_server_client" process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue)) print "Starting process one" process_one.start() #set up second process to run - but this one can run here print "About to do work" #result = do_work(server, client, "server args from do work", "client args from do work") run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue) print "Joining process one" process_one.join() #use outputs above and the result to determine result print "Process one has joined" return_val = handle_queue(master_queue) print return_val return return_val def main(): #grab client client = Client() #grab server server = Server() val = run_in_parallel(server, client) if val: print "failed" else: print "passed" return val if __name__ == "__main__": main()
This code has some modified printouts to see exactly what is happening. I used multiprocessing.Queue to store and share outputs across all processes and back to my main thread for processing. I think this solves the python part of my problem, but there are still some problems in the code I'm working on. The only thing I can say is that the equivalent of func_to_run includes sending the command via ssh and capturing any err along with the output. For some reason, this works great for a team with a low runtime, but not very good for a team with a much longer runtime / exit. I tried to simulate this using completely different working values โโin my code here, but could not reproduce similar results.
EDIT 3 The library code I use (again not mine) uses Popen.wait() for ssh commands, and I just read this:
Popen.wait() Wait for the child process to complete. Set and return the returncode attribute.
A warning. This will be inhibited when using stdout = PIPE and / or stderr = PIPE, and the child process generates sufficient output to the channel so that it blocks the wait> buffer of the OS buffer to receive more data. Use the communication method () to avoid this.
I adjusted the code so as not to buffer and just print, as it is received, and everything works.