Stopping processes in ThreadPool in Python

I am trying to write an interactive shell (for use in ipython) for a library that manages some equipment. Some calls are difficult for I / O, so it makes sense to perform tasks in parallel. Using ThreadPool (almost) works great:

from multiprocessing.pool import ThreadPool class hardware(): def __init__(IPaddress): connect_to_hardware(IPaddress) def some_long_task_to_hardware(wtime): wait(wtime) result = 'blah' return result pool = ThreadPool(processes=4) Threads=[] h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)] for tt in range(4): task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000)) threads.append(task) alive = [True]*4 Try: while any(alive) : for tt in range(4): alive[tt] = not threads[tt].ready() do_other_stuff_for_a_bit() except: #some command I cannot find that will stop the threads... raise for tt in range(4): print(threads[tt].get()) 

The problem occurs if the user wants to stop the process or there is an I / O error in do_other_stuff_for_a_bit() . Pressing Ctrl + C stops the main process, but workflows continue to work until the completion of their current task.
Is there a way to stop these threads without having to rewrite the library or exit the python user? pool.terminate() and pool.join() , which I saw in other examples, do not seem to do the job.

The actual procedure (instead of the simplified version above) uses logging, and although all workflows are closed at some point, I see that the processes they started working with continue until they are finished (and, as hardware, I can see them effect, looking through the room).

This is in python 2.7.

UPDATE:

It seems that the solution is to switch to using multiprocessing. Process instead of thread pool. The trial code I tried is to run foo_pulse:

 class foo(object): def foo_pulse(self,nPulse,name): #just one method of *many* print('starting pulse for '+name) result=[] for ii in range(nPulse): print('on for '+name) time.sleep(2) print('off for '+name) time.sleep(2) result.append(ii) return result,name 

If you try to run this with ThreadPool, then ctrl-C will not stop foo_pulse from starting (although it immediately kills the threads, the print statements continue:

 from multiprocessing.pool import ThreadPool import time def test(nPulse): a=foo() pool=ThreadPool(processes=4) threads=[] for rn in range(4) : r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn))) threads.append(r) alive=[True]*4 try: while any(alive) : #wait until all threads complete for rn in range(4): alive[rn] = not threads[rn].ready() time.sleep(1) except : #stop threads if user presses ctrl-c print('trying to stop threads') pool.terminate() print('stopped threads') # this line prints but output from foo_pulse carried on. raise else : for t in threads : print(t.get()) 

However, the version using multiprocessing.Process works as expected:

 import multiprocessing as mp import time def test_pro(nPulse): pros=[] ans=[] a=foo() for rn in range(4) : q=mp.Queue() ans.append(q) r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))}) r.start() pros.append(r) try: for p in pros : p.join() print('all done') except : #stop threads if user stops findRes print('trying to stop threads') for p in pros : p.terminate() print('stopped threads') else : print('output here') for q in ans : print(q.get()) print('exit time') 

Where I defined the wrapper for the foo library (so that it does not need to be rewritten). If the return value is not required, none of them is a wrapper:

 def wrapper(a,target,q,args=(),kwargs={}): '''Used when return value is wanted''' q.put(getattr(a,target)(*args,**kwargs)) 

From the documentation I do not see the reasons why the pool did not work (except for the error).

+6
source share
2 answers

This is a very interesting use of parallelism.

However, if you use multiprocessing , the goal is to work with many processes in parallel, unlike a single process that runs multiple threads.

Consider these few changes to implement it using multiprocessing :

You have these functions that will be executed in parallel:

 import time import multiprocessing as mp def some_long_task_from_library(wtime): time.sleep(wtime) class MyException(Exception): pass def do_other_stuff_for_a_bit(): time.sleep(5) raise MyException("Something Happened...") 

Create and run processes, say 4:

 procs = [] # this is not a Pool, it is just a way to handle the # processes instead of calling them p1, p2, p3, p4... for _ in range(4): p = mp.Process(target=some_long_task_from_library, args=(1000,)) p.start() procs.append(p) mp.active_children() # this joins all the started processes, and runs them. 

Processes run in parallel, presumably in a separate processor core, but it depends on the OS. You can check your system monitor.

In the meantime, you start a process that is interrupted, and you want to stop running processes without leaving them orphaned:

 try: do_other_stuff_for_a_bit() except MyException as exc: print(exc) print("Now stopping all processes...") for p in procs: p.terminate() print("The rest of the process will continue") 

If it makes no sense to continue the main process when one or all subprocesses are completed, you should process the exit from the main program.

Hope this helps, and you can adapt the bits of this for your library.

+1
source

In response to a question about why the pool does not work, this is due to (as indicated in the Documentation ), then the main one must be imported by child processes and, due to the nature of this project, interactive python is used.

At the same time, it was not clear why ThreadPool would have been - although the clue is right there in the title. ThreadPool creates a workflow pool using multiprocessing.dummy, which, as indicated here , is just a wrapper around the Threading module. The pool uses multiprocessing. This can be seen in this test:

 p=ThreadPool(processes=3) p._pool[0] <DummyProcess(Thread23, started daemon 12345)> #no terminate() method p=Pool(processes=3) p._pool[0] <Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed 

Because threads do not have a completion method, worker threads continue to run until they have completed their current task. Killing threads randomly (that's why I tried to use the multiprocessing module), but the solutions are here .

One solution warning using the above:

 def wrapper(a,target,q,args=(),kwargs={}): '''Used when return value is wanted''' q.put(getattr(a,target)(*args,**kwargs)) 

lies in the fact that attribute changes within an object instance are not transferred back to the main program. As an example, the foo class above may also have methods such as: def addIP (newIP): self.hardwareIP = newIP Calling r=mp.Process(target=a.addIP,args=(127.0.0.1)) does not update a .

The only way for this complex object is to share memory using a user manager , which can provide access to both the methods and the attributes of object a . For a very large complex library-based object, this can best be used with dir(foo) to populate the dispatcher. If I can understand how I will update this answer with an example (for my future "I", like others).

0
source

All Articles