How to create global lock / semaphore with multiprocessing.pool in Python?

I want to restrict access to resources in child processes. For example: limiting HTTP download , io disk , etc. How can I achieve this by extending this base code?

Please share some basic code examples.

pool = multiprocessing.Pool(multiprocessing.cpu_count()) while job_queue.is_jobs_for_processing(): for job in job_queue.pull_jobs_for_processing: pool.apply_async(do_job, callback = callback) pool.close() pool.join() 
+8
python python-multiprocessing
source share
2 answers

Use the initializer and initargs arguments when creating a pool to define global in all child processes.

For example:

 from multiprocessing import Pool, Lock from time import sleep def do_job(i): "The greater i is, the shorter the function waits before returning." with lock: sleep(1-(i/10.)) return i def init_child(lock_): global lock lock = lock_ def main(): lock = Lock() poolsize = 4 with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool: results = pool.imap_unordered(do_job, range(poolsize)) print(list(results)) if __name__ == "__main__": main() 

This code will print the numbers 0-3 in ascending order (the order in which the tasks are specified), since it uses a lock. Comment out the line with lock: to see how numbers are printed in descending order.

This solution works on both windows and unix. However, since processes can evolve on unix systems, unix only needs to declare global variables in the module scope. The child process receives a copy of the parent memory, which includes the lock object, which is still working. Thus, an initializer is not strictly necessary, but it can help document how the code should work. When multiprocessing can create processes by forking, then the following also works.

 from multiprocessing import Pool, Lock from time import sleep lock = Lock() def do_job(i): "The greater i is, the shorter the function waits before returning." with lock: sleep(1-(i/10.)) return i def main(): poolsize = 4 with Pool(poolsize) as pool: results = pool.imap_unordered(do_job, range(poolsize)) print(list(results)) if __name__ == "__main__": main() 
+13
source share

Use a global semaphore and use it if you are accessing a resource. For example:

 import multiprocessing from time import sleep semaphore = multiprocessing.Semaphore(2) def do_job(id): with semaphore: sleep(1) print("Finished job") def main(): pool = multiprocessing.Pool(6) for job_id in range(6): print("Starting job") pool.apply_async(do_job, [job_id]) pool.close() pool.join() if __name__ == "__main__": main() 

This program only processes two jobs every second because other threads are waiting for the semaphore.

-one
source share

All Articles