What you really need is some way to pass exceptions to the parent process, right? Then you can handle them however you want.
If you use concurrent.futures.ProcessPoolExecutor , this is automatic. If you use multiprocessing.Pool , this is trivial. If you use explicit Process and Queue , you need to work a little, but that is not the case.
For example:
def run(self): try: for i in iter(self.inputQueue.get, 'STOP'):
Then your calling code can just read the Exception from the queue, like everything else. Instead of this:
yield outq.pop()
do the following:
result = outq.pop() if isinstance(result, Exception): raise result yield result
(I don’t know what your real code does in the parent process, because your minimal sample simply ignores the queue. But hopefully this explains the idea, although your real code does not actually work.)
It is assumed that you want to throw any unhandled exception that throws it before run . If you want to throw an exception and move on to the next i in iter , just move try to for , not around it.
This also assumes that Exception is invalid. If this is a problem, the easiest solution is to simply click (result, exception) tuples:
def run(self): try: for i in iter(self.inputQueue.get, 'STOP'):
Then your popping code does this:
result, exception = outq.pop() if exception: raise exception yield result
You may notice that this is similar to the node.js callback style, where you pass (err, result) each callback. Yes, this is annoying, and you are going to ruin the code in this style. But you do not use it anywhere except in a wrapper; all of your application-level code that gets values from the queue or gets a call inside run just sees the normal return / deduction values and added exceptions.
You might want to consider building a Future concurrent.futures specification (or using this class as-is), even if you are doing your job and doing it manually. It is not that difficult, and it gives you a very good API, especially for debugging.
Finally, it is worth noting that most of the code created around workers and queues can be made much simpler with the artist / pool design, even if you are absolutely sure that you only need one worker in the queue. Just drop all the patterns and turn the loop into the Worker.run method into a function (which is simply return or raise as usual, instead of being added to the queue). On the caller, discard the entire template again and simply submit or map the job function with its parameters.
Your whole example can be reduced to:
def job(i):
And it will automatically handle exceptions correctly.
As you mentioned in the comments, trace for exception does not bounce back to the child process; this only goes to the manual call of raise result (or, if you use a pool or performer, the guts of the pool or performer).
The reason is that multiprocessing.Queue built on top of pickle , and the etching exceptions do not sort their traces. And the reason is that you cannot sort the traces. And the reason for this is that traces are full of references to the context of local execution, so it would be very difficult to get them to work in another process.
So ... what can you do about this? Do not look for a completely general solution. Instead, think about what you really need. In 90% of cases, you want to "register an exception with tracking and continue" or "print the exception using traceback to stderr and exit(1) as the default unhandled exception handler." For any of them, you don’t need to throw an exception at all; just format it on the child side and pass the string. If you need something more interesting, work out exactly what you need and pass on enough information to manually put it together. If you do not know how to format traces and exceptions, see traceback module. It is pretty simple. And that means you don’t have to get into the brine machine at all. (It’s not very difficult to copyreg pickler or write a holder class using the __reduce__ method or something else, but if you don’t need it, why find out?)