How to Parallelize Scipy Sparse Matrix Multiplication

I have a large sparse X matrix in scipy.sparse.csr_matrix format, and I would like to multiply it by a numpy W array using parallelism. After some research, I found that I need to use Array for multiprocessing to avoid copying X and W between processes (for example, here: How to combine Pool.map with an array (shared memory) in Python Multiprocessing? And Are the shared data read-only copied to different processes for Python multiprocessing? ). Here is my last attempt.

import multiprocessing import numpy import scipy.sparse import time def initProcess(data, indices, indptr, shape, Warr, Wshp): global XData global XIndices global XIntptr global Xshape XData = data XIndices = indices XIntptr = indptr Xshape = shape global WArray global WShape WArray = Warr WShape = Wshp def dot2(args): rowInds, i = args global XData global XIndices global XIntptr global Xshape data = numpy.frombuffer(XData, dtype=numpy.float) indices = numpy.frombuffer(XIndices, dtype=numpy.int32) indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) global WArray global WShape W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) return Xr[rowInds[i]:rowInds[i+1], :].dot(W) def getMatmat(X): numJobs = multiprocessing.cpu_count() rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) #Store the data in X as RawArray objects so we can share it amoung processes XData = multiprocessing.RawArray("d", X.data) XIndices = multiprocessing.RawArray("i", X.indices) XIndptr = multiprocessing.RawArray("i", X.indptr) def matmat(W): WArray = multiprocessing.RawArray("d", W.flatten()) pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) params = [] for i in range(numJobs): params.append((rowInds, i)) iterator = pool.map(dot2, params) P = numpy.zeros((X.shape[0], W.shape[1])) for i in range(numJobs): P[rowInds[i]:rowInds[i+1], :] = iterator[i] return P return matmat if __name__ == '__main__': #Create a random sparse matrix X and a random dense one WX = scipy.sparse.rand(10000, 8000, 0.1) X = X.tocsr() W = numpy.random.rand(8000, 20) startTime = time.time() A = getMatmat(X)(W) parallelTime = time.time()-startTime startTime = time.time() B = X.dot(W) nonParallelTime = time.time()-startTime print(parallelTime, nonParallelTime) 

However, the output looks something like this: (4.431, 0.165), indicating that the parallel version is much slower than the non-parallel multiplication.

I believe that slowdown can be caused in similar situations when you copy big data to processes, but this is not the case here, since I use Array to store shared variables (if this does not happen in numpy.frombuffer or when creating csr_matrix, but then I could not find a way to share csr_matrix). Another possible reason for slow speed returns the large result of each matrix multiplication for each process, however I am not sure about that.

Can anyone see where I'm wrong? Thanks for any help!

Update: I cannot be sure, but I think that exchanging large amounts of data between processes is not so efficient, and ideally I should use multithreading (although Global Interpreter Lock (GIL) makes it very difficult). One way is to free the GIL using Cython, for example (see http://docs.cython.org/src/userguide/parallelism.html ), although many numpy functions must go through the GIL.

+8
python scipy parallel-processing sparse-matrix
source share
2 answers

It is best to go down to C with Cython. That way you can defeat the GIL and use OpenMP. I'm not surprised that multiprocessing is slower - there is a lot of overhead.

Here the naive OpenMP shell with a sparse matrix CSparse is the product vector code in python.

On my laptop, it works a little faster than lean. But I do not have such cores. The code, including the setup.py script file and C and the C header files, is in this value: https://gist.github.com/rmcgibbo/6019670

I suspect that if you really want parallel code to be fast (on my laptop it is about 20% faster than single-threaded scipy, even when using 4 threads), you need to think more carefully about where parallelism happens than I do , paying attention to the locality of the cache.

 # psparse.pyx #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- cimport cython cimport numpy as np import numpy as np import scipy.sparse from libc.stddef cimport ptrdiff_t from cython.parallel import parallel, prange #----------------------------------------------------------------------------- # Headers #----------------------------------------------------------------------------- ctypedef int csi ctypedef struct cs: # matrix in compressed-column or triplet form csi nzmax # maximum number of entries csi m # number of rows csi n # number of columns csi *p # column pointers (size n+1) or col indices (size nzmax) csi *i # row indices, size nzmax double *x # numerical values, size nzmax csi nz # # of entries in triplet matrix, -1 for compressed-col cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil cdef extern csi cs_print (cs *A, csi brief) nogil assert sizeof(csi) == 4 #----------------------------------------------------------------------------- # Functions #----------------------------------------------------------------------------- @cython.boundscheck(False) def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): """Multiply a sparse CSC matrix by a dense matrix Parameters ---------- X : scipy.sparse.csc_matrix A sparse matrix, of size N x M W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] A dense matrix, of size M x P. Note, W must be contiguous and in fortran (column-major) order. You can ensure this using numpy `asfortranarray` function. Returns ------- A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] A dense matrix, of size N x P, the result of multiplying X by W. Notes ----- This function is parallelized over the columns of W using OpenMP. You can control the number of threads at runtime using the OMP_NUM_THREADS environment variable. The internal sparse matrix code is from CSPARSE, a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the GNU LGPL v2.1+. References ---------- .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 """ if X.shape[1] != W.shape[0]: raise ValueError('matrices are not aligned') cdef int i cdef cs csX cdef np.ndarray[double, ndim=2, mode='fortran'] result cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data # Pack the scipy data into the CSparse struct. This is just copying some # pointers. csX.nzmax = X.data.shape[0] csX.m = X.shape[0] csX.n = X.shape[1] csX.p = &indptr[0] csX.i = &indices[0] csX.x = &data[0] csX.nz = -1 # to indicate CSC format result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) for i in prange(W.shape[1], nogil=True): # X is in fortran format, so we can get quick access to each of its # columns cs_gaxpy(&csX, &W[0, i], &result[0, i]) return result 

It calls some C from CSparse.

 // src/cs_gaxpy.c #include "cs.h" /* y = A*x+y */ csi cs_gaxpy (const cs *A, const double *x, double *y) { csi p, j, n, *Ap, *Ai ; double *Ax ; if (!CS_CSC (A) || !x || !y) return (0) ; /* check inputs */ n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; for (j = 0 ; j < n ; j++) { for (p = Ap [j] ; p < Ap [j+1] ; p++) { y [Ai [p]] += Ax [p] * x [j] ; } } return (1) ; } 
+1
source share

Perhaps a little late with the answer. You can probably get reliable parallel accelerations using the pyTrilinos package, which provides the python shell for many functions in Trilinos. Here is your example converted to use pyTrilinos:

 from PyTrilinos import Epetra from scipy.sparse import rand import numpy as np n_rows = 10000 n_cols = 8000 n_vecs = 20 fill_factor = 0.1 comm = Epetra.PyComm() my_id = comm.MyPID() row_map = Epetra.Map(n_rows, 0, comm) out_vec_map = row_map in_vec_map = Epetra.Map(n_cols, 0, comm) col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) n_local_rows = row_map.NumMyElements() # Create local block matrix in scipy and convert to Epetra X = rand(n_local_rows, n_cols, fill_factor).tocoo() A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) A.InsertMyValues(X.row, X.col, X.data) A.FillComplete() # Create sub-vectors in numpy and convert to Epetra format W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) V = Epetra.MultiVector(in_vec_map, n_vecs) V[:] = WT # order of indices is opposite B = Epetra.MultiVector(out_vec_map, n_vecs) # Multiply A.Multiply(False, V, B) 

Then you can run this code using MPI

 mpiexec -n 2 python scipy_to_trilinos.py 

Other examples of PyTrilinos can be found in the github repository here . Of course, if someone used pyTrilinos, this way of initializing the matrix with scipy might not be the most optimal.

+1
source share

All Articles