Python multiprocessing automatically shuts down

I am using Python 2.7.3. I have parallelized some code using subclasses of multiprocessing.Process . If there are errors in the code in my Process objects of the subclass, everything works fine. But if there are errors in the code in my subclass of Process objects, they will probably crash for a short time (not a single stacktrace file will be printed on the stack), and the processor load will drop to zero. Parent code never crashes, giving the impression that execution is just hanging. However, it is very difficult to track where the error is in the code, because no information is given on where the error is.

I cannot find other questions about stackoverflow that deal with the same issue.

I think the objects of the subclass class seem to crash accidentally because they cannot print the error message to the parent shell, but I would like to know what I can do about it so that I can at least debug more efficiently (and so other users of my code can tell me when they run into problems too).

EDIT: my real code is too complicated, but a trivial example of an object with a subclass of Process with an error in it would be something like this:

 from multiprocessing import Process, Queue class Worker(Process): def __init__(self, inputQueue, outputQueue): super(Worker, self).__init__() self.inputQueue = inputQueue self.outputQueue = outputQueue def run(self): for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) 
+8
python parallel-processing multiprocessing
source share
3 answers

What you really need is some way to pass exceptions to the parent process, right? Then you can handle them however you want.

If you use concurrent.futures.ProcessPoolExecutor , this is automatic. If you use multiprocessing.Pool , this is trivial. If you use explicit Process and Queue , you need to work a little, but that is not the case.

For example:

 def run(self): try: for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) except Exception as e: self.outputQueue.put(e) 

Then your calling code can just read the Exception from the queue, like everything else. Instead of this:

 yield outq.pop() 

do the following:

 result = outq.pop() if isinstance(result, Exception): raise result yield result 

(I don’t know what your real code does in the parent process, because your minimal sample simply ignores the queue. But hopefully this explains the idea, although your real code does not actually work.)

It is assumed that you want to throw any unhandled exception that throws it before run . If you want to throw an exception and move on to the next i in iter , just move try to for , not around it.

This also assumes that Exception is invalid. If this is a problem, the easiest solution is to simply click (result, exception) tuples:

 def run(self): try: for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put((result, None)) except Exception as e: self.outputQueue.put((None, e)) 

Then your popping code does this:

 result, exception = outq.pop() if exception: raise exception yield result 

You may notice that this is similar to the node.js callback style, where you pass (err, result) each callback. Yes, this is annoying, and you are going to ruin the code in this style. But you do not use it anywhere except in a wrapper; all of your application-level code that gets values ​​from the queue or gets a call inside run just sees the normal return / deduction values ​​and added exceptions.

You might want to consider building a Future concurrent.futures specification (or using this class as-is), even if you are doing your job and doing it manually. It is not that difficult, and it gives you a very good API, especially for debugging.

Finally, it is worth noting that most of the code created around workers and queues can be made much simpler with the artist / pool design, even if you are absolutely sure that you only need one worker in the queue. Just drop all the patterns and turn the loop into the Worker.run method into a function (which is simply return or raise as usual, instead of being added to the queue). On the caller, discard the entire template again and simply submit or map the job function with its parameters.

Your whole example can be reduced to:

 def job(i): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) return result with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: results = executor.map(job, range(10)) 

And it will automatically handle exceptions correctly.


As you mentioned in the comments, trace for exception does not bounce back to the child process; this only goes to the manual call of raise result (or, if you use a pool or performer, the guts of the pool or performer).

The reason is that multiprocessing.Queue built on top of pickle , and the etching exceptions do not sort their traces. And the reason is that you cannot sort the traces. And the reason for this is that traces are full of references to the context of local execution, so it would be very difficult to get them to work in another process.

So ... what can you do about this? Do not look for a completely general solution. Instead, think about what you really need. In 90% of cases, you want to "register an exception with tracking and continue" or "print the exception using traceback to stderr and exit(1) as the default unhandled exception handler." For any of them, you don’t need to throw an exception at all; just format it on the child side and pass the string. If you need something more interesting, work out exactly what you need and pass on enough information to manually put it together. If you do not know how to format traces and exceptions, see traceback module. It is pretty simple. And that means you don’t have to get into the brine machine at all. (It’s not very difficult to copyreg pickler or write a holder class using the __reduce__ method or something else, but if you don’t need it, why find out?)

+14
source share

I suggest such a workaround to display process exceptions

 from multiprocessing import Process import traceback run_old = Process.run def run_new(*args, **kwargs): try: run_old(*args, **kwargs) except (KeyboardInterrupt, SystemExit): raise except: traceback.print_exc(file=sys.stdout) Process.run = run_new 
+2
source share

This is not an answer, just an extended comment. Run this program and tell us what the result (if any):

 from multiprocessing import Process, Queue class Worker(Process): def __init__(self, inputQueue, outputQueue): super(Worker, self).__init__() self.inputQueue = inputQueue self.outputQueue = outputQueue def run(self): for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) if __name__ == '__main__': inq, outq = Queue(), Queue() inq.put(1) inq.put('STOP') w = Worker(inq, outq) w.start() 

I get:

 % test.py Process Worker-1: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/home/unutbu/pybin/test.py", line 21, in run 1 / 0 # Dumb error ZeroDivisionError: integer division or modulo by zero 

I am surprised (if) you get nothing.

+1
source share

All Articles