Using list with python multiprocessing

can someone help me share a list between multiple python processes. The problem is to get self.ID_List and self.mps_in_process working in the following code.

import time, random from multiprocessing import Process #, Manager, Array, Queue class MP_Stuff(): def __init__(self, parent, id): time.sleep(1 + random.random()*10) # simulate data processing parent.killMP(id) class ParamHandler(): def doFirstMP(self, IDs): self.mps_in_process = [] self.ID_List = IDs id = self.ID_List.pop(0) p = Process(target=MP_Stuff, args=(self, id)) self.mps_in_process.append(id) p.start() def doMP(self): for tmp in range(3): # nr of concurrent processes if len(self.ID_List) > 0: id = self.ID_List.pop(0) p = Process(target=MP_Stuff, args=(self, id)) self.mps_in_process.append(id) p.start() def killMP(self, kill_id): self.mps_in_process.remove(kill_id) self.doMP() if __name__ == '__main__': ID_List = [1,2,3,4,5,6] paramSet = ParamHandler() paramSet.doFirstMP(ID_List) 

In short, what the code does is that some data (here, random time in MP_Stuff) is processed according to the data identifier in self.ID_List. Self.mps_in_process is used to find out how many data identifiers are running (here nr-processes are hard-coded, but in reality they are dynamic).

The problem is to split mps_in_process and ID_List into several processes. The current code goes into a rather endless loop. What is wrong is actually well described in a multiprocessor library:

"if the code executed in the child process is trying to access the global variable, then the value that it sees (if any) may not coincide with the value in the parent process at the time of calling Process.start ()."

However, I cannot figure out how to work with mps_in_process and ID_List. I cannot use Queue since the elements deduced from mps_in_process are random. I cannot use Array because .pop (0) is not working. I can not use Manager (). List (), because then .reove () and len (ID_List) do not work. Using streaming instead of multiprocessing is not a solution because you need to use freeze_support ().

Therefore, any help on how to share the list between the processes is very welcome!

+6
python list multiprocessing
source share
2 answers

The manager is working fine (including len ()). The problem with your code is that in the main process you do not wait for the processing to complete, so the main process ends and the manager is no longer available. Also, I do not know about the atomicity of ListProxy pop, so locking may be convenient.

Solution p.join() .

However, I am confused why p.join at the end of doFirstMP . I would be glad if someone could explain why the connection on the first p returns after all the calculations are complete, and not after the first doMP returns.

My code is:

 import time, random from multiprocessing import Process, Manager class MP_Stuff(): def __init__(self, parent, id): time.sleep(1 + random.random()*5) # simulate data processing print id , "done" parent.killMP(id) class ParamHandler(): def doFirstMP(self, IDs): self.mps_in_process = [] self.ID_List = Manager().list(IDs) id = self.ID_List.pop(0) p = Process(target=MP_Stuff, args=(self, id)) self.mps_in_process.append(id) p.start() p.join() print "joined" def doMP(self): for tmp in range(3): # nr of concurrent processes print self.ID_List if len(self.ID_List) > 0: id = self.ID_List.pop(0) p = Process(target=MP_Stuff, args=(self, id)) self.mps_in_process.append(id) p.start() def killMP(self, kill_id): print "kill", kill_id self.mps_in_process.remove(kill_id) self.doMP() if __name__ == '__main__': ID_List = [1,2,3,4,5,6] paramSet = ParamHandler() paramSet.doFirstMP(ID_List) 
+3
source share

Unfortunately, you have already indicated your options.

Both Array() and Manager().list() should be able to do this, although you may need a little extra work.

  • You can emulate len(ID_List) by storing the sum in Value() and increasing / decreasing it.
  • remove() can be easily emulated using a loop and removing after it (albeit slower).
0
source share

All Articles