Writing data to an hdf file using multiprocessing

This seems like a simple problem, but I can't get around it.

I have a simulation that runs in a double for loop and writes the results to an HDF file. The following is a simple version of this program:

import tables as pt a = range(10) b = range(5) def Simulation(): hdf = pt.openFile('simulation.h5',mode='w') for ii in a: print(ii) hdf.createGroup('/','A%s'%ii) for i in b: hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) hdf.close() return Simulation() 

This code does exactly what I want, but since the process can take quite a while, I tried to use the multiprocessing module and use the following code:

 import multiprocessing import tables as pt a = range(10) b = range(5) def Simulation(ii): hdf = pt.openFile('simulation.h5',mode='w') print(ii) hdf.createGroup('/','A%s'%ii) for i in b: hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) hdf.close() return if __name__ == '__main__': jobs = [] for ii in a: p = multiprocessing.Process(target=Simulation, args=(ii,)) jobs.append(p) p.start() 

However, however, it only outputs the latest simulation to an HDF file, so it overrides all other groups.

+6
source share
1 answer

Each time you open a file in write mode ( w ), a new file is created - so the contents of the file are lost if it already exists. Only the last file descriptor can successfully write to the file. Even if you changed this in the add mode, you should not try to write to one file from several processes - the output will be distorted if two processes try to write at the same time.

Instead, all workflows put the output in the queue and have a dedicated process (either a subprocess or the main process) process the output from the queue and write it to a file:


 import multiprocessing as mp import tables as pt num_arrays = 100 num_processes = mp.cpu_count() num_simulations = 1000 sentinel = None def Simulation(inqueue, output): for ii in iter(inqueue.get, sentinel): output.put(('createGroup', ('/', 'A%s' % ii))) for i in range(num_arrays): output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) def handle_output(output): hdf = pt.openFile('simulation.h5', mode='w') while True: args = output.get() if args: method, args = args getattr(hdf, method)(*args) else: break hdf.close() if __name__ == '__main__': output = mp.Queue() inqueue = mp.Queue() jobs = [] proc = mp.Process(target=handle_output, args=(output, )) proc.start() for i in range(num_processes): p = mp.Process(target=Simulation, args=(inqueue, output)) jobs.append(p) p.start() for i in range(num_simulations): inqueue.put(i) for i in range(num_processes): # Send the sentinal to tell Simulation to end inqueue.put(sentinel) for p in jobs: p.join() output.put(None) proc.join() 

For comparison, here is the version that uses mp.Pool :

 import multiprocessing as mp import tables as pt num_arrays = 100 num_processes = mp.cpu_count() num_simulations = 1000 def Simulation(ii): result = [] result.append(('createGroup', ('/', 'A%s' % ii))) for i in range(num_arrays): result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) return result def handle_output(result): hdf = pt.openFile('simulation.h5', mode='a') for args in result: method, args = args getattr(hdf, method)(*args) hdf.close() if __name__ == '__main__': # clear the file hdf = pt.openFile('simulation.h5', mode='w') hdf.close() pool = mp.Pool(num_processes) for i in range(num_simulations): pool.apply_async(Simulation, (i, ), callback=handle_output) pool.close() pool.join() 

It looks easier, right? However, there is one significant difference. The source code used output.put to send args to handle_output , which was executed in its own subprocess. handle_output will take args from the output queue and process them immediately. With the pool code above, Simulation accumulates a whole bunch of args in result , and result not sent to handle_output until Simulation returns.

If Simulation takes a long time, there will be a long waiting period until nothing is written to simulation.h5 .

+10
source

All Articles