Python: concurrent.futures How to make it cancelable?

Python concurrent.futures and ProcessPoolExecutor provide a convenient interface for scheduling and monitoring tasks. Futures even provide a .cancel () method:

cancel () : attempt to cancel the call. If the call is currently being made and cannot be canceled , then the method will return False, otherwise the call will be canceled and the method will return True.

Unfortunately, in the simmilar question (regarding asyncio), the answer states that running tasks cannot be neutralized using this document, but the documents do not say that only if they are running and do not agree.

Multiprocessing Transmission. Signs for processes are also not trivially possible (by doing this through parameters, as in multiprocess.Process returns a RuntimeError)

What am I trying to do? I would like to split the search space and run a task for each section. But it’s enough to have one solution, and the process is intensive. So, is there really a convenient way to accomplish this that doesn't compensate for the gain, using ProcessPool to get started?

Example:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait # function that profits from partitioned search space def m_run(partition): for elem in partition: if elem == 135135515: return elem return False futures = [] # used to create the partitions steps = 100000000 with ProcessPoolExecutor(max_workers=4) as pool: for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) done, not_done = wait(futures, return_when=FIRST_COMPLETED) for d in done: print(d.result()) print("---") for d in not_done: # will return false for Cancel and Result for all futures print("Cancel: "+str(d.cancel())) print("Result: "+str(d.result())) 
+3
source share
3 answers

I found your question interesting, so I found it.

I found the behavior of the .cancel() method as indicated in the python documentation. As for your simultaneous functions, unfortunately, they could not be undone even after they were told to do it. If my finding is correct, then I believe that Python really requires a more efficient .cancel () method.

Run the code below to check my details.

 from concurrent.futures import ProcessPoolExecutor, as_completed from time import time # function that profits from partitioned search space def m_run(partition): for elem in partition: if elem == 3351355150: return elem break #Added to terminate loop once found return False start = time() futures = [] # used to create the partitions steps = 1000000000 with ProcessPoolExecutor(max_workers=4) as pool: for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) ### New Code: Start ### for f in as_completed(futures): print(f.result()) if f.result(): print('break') break for f in futures: print(f, 'running?',f.running()) if f.running(): f.cancel() print('Cancelled? ',f.cancelled()) print('New Instruction Ended at = ', time()-start ) print('Total Compute Time = ', time()-start ) 

Update: You can force the completion of parallel processes through bash, but as a result, the main python program will also end. If this is not a problem with you, try the code below.

You must add the codes below between the last two print statements to see for yourself. Note. This code only works if you are not running any other python3 program.

 import subprocess, os, signal result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='], stdout=subprocess.PIPE).stdout.decode('utf-8').split() print ('result =', result) for i in result: print('PID = ', i) if i != result[0]: os.kill(int(i), signal.SIGKILL) try: os.kill(int(i), 0) raise Exception("""wasn't able to kill the process HINT:use signal.SIGKILL or signal.SIGABORT""") except OSError as ex: continue 
+2
source

I don’t know why concurrent.futures.Future does not have a .kill() method, but you can do what you want by disabling the process pool with pool.shutdown(wait=False) and manually killing the remaining child processes.

Create a function to kill child processes:

 import signal, psutil def kill_child_processes(parent_pid, sig=signal.SIGTERM): try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for process in children: process.send_signal(sig) 

Run your code until you get the first result, and then run all the remaining child processes:

 from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait # function that profits from partitioned search space def m_run(partition): for elem in partition: if elem == 135135515: return elem return False futures = [] # used to create the partitions steps = 100000000 pool = ProcessPoolExecutor(max_workers=4) for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED) # Shut down pool pool.shutdown(wait=False) # Kill remaining child processes kill_child_processes(os.getpid()) 
+1
source

Unfortunately, the launch of Futures cannot be canceled. I believe that the main reason is to provide the same API for different implementations (it is impossible to interrupt the execution of threads or coroutines).

The Pebble library was designed to overcome this and other limitations.

 from pebble import ProcessPool def function(foo, bar=0): return foo + bar with ProcessPool() as pool: future = pool.schedule(function, args=[1]) # if running, the container process will be terminated # a new process will be started consuming the next task future.cancel() 
+1
source

All Articles