Python Multiprocessing concurrency using dispatcher, pool and shared list not working

I am learning python multiprocessor and I am trying to use this function to populate a list with all the files present in os. However, the code I wrote is executed only sequentially.

#!/usr/bin/python import os import multiprocessing tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/" manager = multiprocessing.Manager() files = manager.list() def get_files(x): for root, dir, file in os.walk(x): for name in file: files.append(os.path.join(root, name)) mp = [multiprocessing.Process(target=get_files, args=(tld[x],)) for x in range(len(tld))] for i in mp: i.start() i.join() print len(files) 

When I checked the process tree, I see only one chili process spawned. (man pstree says that {} denotes a child process spawned by a parent.)

 ---bash(10949)---python(12729)-+-python(12730)---{python}(12752) `-python(12750)` 

I was looking to create a process for each tld directory, fill out the general list of files , and this will be about 10-15 processes depending on the number of directories. What am I doing wrong?

EDIT ::

I used multiprocessing.Pool to create workflows, and this time the processes spawn, but when I try to use multiprocessing.Pool.map() , errors occur. I meant the following code in python docs that shows

 from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': p = Pool(5) print(p.map(f, [1, 2, 3])) 

Following this example, I rewrote the code as

 import os import multiprocessing tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] manager = multiprocessing.Manager() pool = multiprocessing.Pool(processes=len(tld)) print pool files = manager.list() def get_files(x): for root, dir, file in os.walk(x): for name in file: files.append(os.path.join(root, name)) pool.map(get_files, [x for x in tld]) pool.close() pool.join() print len(files) 

and it breaks down several processes.

 ---bash(10949)---python(12890)-+-python(12967) |-python(12968) |-python(12970) |-python(12971) |-python(12972) ---snip--- 

But the code is wrong saying

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

What am I doing wrong here and why are there errors in the get_files () function missing?

+6
source share
1 answer

This is simply because you create your pool before the get_files function get_files :

 import os import multiprocessing tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] manager = multiprocessing.Manager() files = manager.list() def get_files(x): for root, dir, file in os.walk(x): for name in file: files.append(os.path.join(root, name)) pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here pool.map(get_files, [x for x in tld]) pool.close() pool.join() print len(files) 

The general idea of ​​the process is that the moment you start it, you fork the memory of the main process. Thus, any determination made in the main process after will not be in the subprocess.

If you want to use shared memory, you can use the threading library, but you will have problems with it (cf: Global interpreter lock )

+3
source

All Articles