Multiprocessing Python IOError: [Errno 232] The pipe closes

I am trying to implement this tutorial in mutliprocessing in python, but when I tried to execute my own task, I got the following error:

Traceback (most recent call last): >>> File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed send(obj) IOError: [Errno 232] The pipe is being closed 

Here is a reproducible example of what I'm trying to do that gives the same error message:

 from multiprocessing import Lock, Process, Queue, current_process import time class Testclass(object): def __init__(self, x): self.x = x def toyfunction(testclass): testclass.product = testclass.x * testclass.x return testclass def worker(work_queue, done_queue): try: for testclass in iter(work_queue.get, 'STOP'): print(testclass.counter) newtestclass = toyfunction(testclass) done_queue.put(newtestclass) except: print('error') return True def main(): counter = 1 database = [] while counter <= 1000: database.append(Testclass(3)) counter += 1 print(counter) workers = 8 work_queue = Queue() done_queue = Queue() processes = [] start = time.clock() counter = 1 for testclass in database: testclass.counter = counter work_queue.put(testclass) counter += 1 print(counter) print('items loaded') for w in range(workers): p = Process(target=worker, args=(work_queue, done_queue)) p.start() processes.append(p) work_queue.put('STOP') for p in processes: p.join() done_queue.put('STOP') print(time.clock()-start) print("Done") if __name__ == '__main__': main() 
+2
python multiprocessing
source share
2 answers

When I add code that processes the completed queue, I no longer get the error. Here is the working code:

 from multiprocessing import Lock, Process, Queue, current_process import time class Testclass(object): def __init__(self, x): self.x = x def toyfunction(testclass): testclass.product = testclass.x * testclass.x return testclass def worker(work_queue, done_queue): try: for testclass in iter(work_queue.get, 'STOP'): print(testclass.counter) newtestclass = toyfunction(testclass) done_queue.put(newtestclass) except: print('error') return True def main(): counter = 1 database = [] while counter <= 100: database.append(Testclass(10)) counter += 1 print(counter) workers = 8 work_queue = Queue() done_queue = Queue() processes = [] start = time.clock() counter = 1 for testclass in database: testclass.counter = counter work_queue.put(testclass) counter += 1 print(counter) print('items loaded') for w in range(workers): p = Process(target=worker, args=(work_queue, done_queue)) p.start() processes.append(p) work_queue.put('STOP') for p in processes: p.join() done_queue.put('STOP') newdatabase = [] for testclass in iter(done_queue.get, 'STOP'): newdatabase.append(testclass) print(time.clock()-start) print("Done") return(newdatabase) if __name__ == '__main__': database = main() 
0
source share

I dealt with this by emptying the queue after using the event to exit the process correctly:

 self.event.set() #the process has a timer that checks for this to be set, then shuts itself down while not self._q.empty(): #_q is a multiprocess.Queue object used to communicate inter-process try: self._q.get(timeout=0.001) except: pass self._q.close() 
+2
source share

All Articles