I am trying to write an interactive shell (for use in ipython) for a library that manages some equipment. Some calls are difficult for I / O, so it makes sense to perform tasks in parallel. Using ThreadPool (almost) works great:
from multiprocessing.pool import ThreadPool class hardware(): def __init__(IPaddress): connect_to_hardware(IPaddress) def some_long_task_to_hardware(wtime): wait(wtime) result = 'blah' return result pool = ThreadPool(processes=4) Threads=[] h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)] for tt in range(4): task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000)) threads.append(task) alive = [True]*4 Try: while any(alive) : for tt in range(4): alive[tt] = not threads[tt].ready() do_other_stuff_for_a_bit() except:
The problem occurs if the user wants to stop the process or there is an I / O error in do_other_stuff_for_a_bit() . Pressing Ctrl + C stops the main process, but workflows continue to work until the completion of their current task.
Is there a way to stop these threads without having to rewrite the library or exit the python user? pool.terminate() and pool.join() , which I saw in other examples, do not seem to do the job.
The actual procedure (instead of the simplified version above) uses logging, and although all workflows are closed at some point, I see that the processes they started working with continue until they are finished (and, as hardware, I can see them effect, looking through the room).
This is in python 2.7.
UPDATE:
It seems that the solution is to switch to using multiprocessing. Process instead of thread pool. The trial code I tried is to run foo_pulse:
class foo(object): def foo_pulse(self,nPulse,name):
If you try to run this with ThreadPool, then ctrl-C will not stop foo_pulse from starting (although it immediately kills the threads, the print statements continue:
from multiprocessing.pool import ThreadPool import time def test(nPulse): a=foo() pool=ThreadPool(processes=4) threads=[] for rn in range(4) : r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn))) threads.append(r) alive=[True]*4 try: while any(alive) : #wait until all threads complete for rn in range(4): alive[rn] = not threads[rn].ready() time.sleep(1) except : #stop threads if user presses ctrl-c print('trying to stop threads') pool.terminate() print('stopped threads') # this line prints but output from foo_pulse carried on. raise else : for t in threads : print(t.get())
However, the version using multiprocessing.Process works as expected:
import multiprocessing as mp import time def test_pro(nPulse): pros=[] ans=[] a=foo() for rn in range(4) : q=mp.Queue() ans.append(q) r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))}) r.start() pros.append(r) try: for p in pros : p.join() print('all done') except :
Where I defined the wrapper for the foo library (so that it does not need to be rewritten). If the return value is not required, none of them is a wrapper:
def wrapper(a,target,q,args=(),kwargs={}): '''Used when return value is wanted''' q.put(getattr(a,target)(*args,**kwargs))
From the documentation I do not see the reasons why the pool did not work (except for the error).