System error when starting subprocesses using multiprocessing

I get a system error (shown below) while doing some simple numpy-based matrix algebra calculations using the Multiprocessing package (python 2.73 from numpy 1.7.0 on Ubuntu 12.04 on Amazon EC2). My code works fine for smaller matrix sizes, but crashes for large ones (with lots of available memory)

The size of the matrices used is significant (my code works fine for dense matrices of 1000000x10, but drops by 1000000x500 units - for example, I pass these matrices to / from subprocesses). 10 vs 500 is a runtime parameter, everything else remains the same (input, other runtime parameters, etc.).

I also tried to run the same (ported) code using python3 - for larger matrices, the subprocesses go into standby / idle mode (instead of crashes, as in python 2.7), and the program / subprocesses just hang nothing there. For smaller matrices, the code works fine with python3.

Any suggestions would be much appreciated (I'm running out of ideas here)

Error message:

Exception in thread Thread-5: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks put(task) SystemError: NULL result without error in PyObject_Call 

Multiplication code used:

 def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): if len(listOfInputs) == 0: return # Add result queue to the list of argument tuples. resultQueue = mp.Manager().Queue() listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] # Create and initialize the pool of workers. pool = mp.Pool(processes = nParallelProcesses) pool.map(proc, listOfInputsNew) # Run the processes. pool.close() pool.join() # Return the results. return [resultQueue.get() for i in range(len(listOfInputs))] 

Below is the "proc" that runs for each subprocess. Basically, it solves many systems of linear equations using numpy (it builds the necessary matrices inside the subprocess) and returns the results as another matrix. Once again, it works fine for smaller values โ€‹โ€‹of one runtime parameter, but crashes (or hangs in python3) for larger ones.

 def solveForLFV(param): startTime = time.time() (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param LFoutChunkSize = XY.shape[0] nLFdim = LFVin.shape[1] sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) for LFVoutIndex in xrange(LFoutChunkSize): LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] sumLFVinOuterProductLFVpurch[:, :] = 0. LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) queue.put((chunkI, LFVoutChunk)) print 'solveForLFV: ', time.time() - startTime, 'sec' sys.stdout.flush() 
+8
python numpy multiprocessing
source share
1 answer

500,000,000 is quite large: if you use float64, it is 4 billion bytes or about 4 GB. (A 10,000,000 floating point array will be 80 million bytes, or about 80 MB โ€” much less.) I expect the problem is with multiprocessing, trying to rekindle arrays for sending to subprocesses over a pipe.

Since you are on a unix platform, you can avoid this behavior by using the fork() memory inheritance behavior (used to create multiprocessor workers). I had great success with this hack (torn out of this project ), described in the comments.

 ### A helper for letting the forked processes use data without pickling. _data_name_cands = (    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10))    for _ in itertools.count()) class ForkedData(object):    ''' Class used to pass data to child processes in multiprocessing without really pickling/unpickling it. Only works on POSIX. Intended use: - The master process makes the data somehow, and does eg data = ForkedData(the_value) - The master makes sure to keep a reference to the ForkedData object until the children are all done with it, since the global reference is deleted to avoid memory leaks when the ForkedData object dies. - Master process constructs a multiprocessing.Pool *after* the ForkedData construction, so that the forked processes inherit the new global. - Master calls eg pool.map with data as an argument. - Child gets the real value through data.value, and uses it read-only. '''    # TODO: does data really need to be used read-only? don't think so...    # TODO: more flexible garbage collection options    def __init__(self, val):        g = globals()        self.name = next(n for n in _data_name_cands if n not in g)        g[self.name] = val        self.master_pid = os.getpid()    @property    def value(self):        return globals()[self.name]    def __del__(self):        if os.getpid() == self.master_pid:            del globals()[self.name] 
+5
source share

All Articles