How to parallel for a loop in python?

Update 1.0 start

It seems when the call

for i, Wi in enumerate(WT): idx.append(i) result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) 

the arguments passed to the ALS_Y/ALS_X function are not references; they copy the arguments. So, when X or Y very large matrixes , for example, in my case it is 6000*40 or therefore (And this is a for-loop , let's say iterations of the number 50 000 , therefore ...), it exceeds the memory limit.
And then I tried to use global arguments, just passing the indexes as parameters in the function,

 import multiprocessing import time import numpy as np def func(idx): global a a[idx] += 1 if __name__ == "__main__": a=range(10) for j in xrange(2): pool = multiprocessing.Pool(processes=8) result = [] for i in xrange(10): result.append(pool.apply_async(func, (i, ))) pool.close() pool.join() print a print "Sub-process(es) done." 

he writes: `

 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Sub-process(es) done. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Sub-process(es) done. 

So, this means it still copied a`! Now, I wonder if there is a way to handle this problem? Appreciate!

Update 1.0 end


Below is my python code to solve matrix factorization problem. W = XY. However, the codes below are ineffective, and I hope that it can be converted to a parallel version, and it is best to use a graphics processor. I have no parallel programming experience, so is there anyone who can give me advice?

Below is the code for factorizing a matrix using ALS (alternating least square, details here )

 for ii in range(n_iterations): for u, Wu in enumerate(W): X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), YT)) + lambda_ * np.eye(n_factors), np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop for i, Wi in enumerate(WT): Y[:,i] = np.linalg.solve(np.dot(XT, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop np.dot(XT, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop error = get_error(Q, X, Y, W) weighted_errors.append(error) print '{}th iteration is completed'.format(ii) 

After using the multiprocessor library, now my code:

 def ALS_X(Y, Wu, Q, lambda_, n_factors, u): return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), YT)) + lambda_ * np.eye(n_factors), np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T for ii in range(n_iterations): pool = multiprocessing.Pool(processes=12)#create pool result = []#store each row for X idx = []#store the row number for u, Wu in enumerate(W): idx.append(u) result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) pool.close() pool.join() for u, vector in zip(idx, result): X[u] = vector.get()#assign the result to X ###################################### pool = multiprocessing.Pool(processes=12)#for Y, much similar to X result = [] idx = [] for i, Wi in enumerate(WT): idx.append(i) result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) pool.close() pool.join() for i, vector in zip(idx, result): Y[:,i] = vector.get() error = get_error(Q, X, Y, W) weighted_errors.append(error) print '{}th iteration is completed'.format(ii), 'error: ',error 

But a little suffering, the program always crashed ...

Below is the whole bunch of my code. All this is in a disordered state. JUST ignore load_data get_error and vec2str , since here I randomly generate a matrix.

 import pandas as pd import numpy as np import multiprocessing def vec2str(vec): res = '' for dim in len(vec): res += str(vec[dim]) + ',' return res def load_data(heads, filename, sep,header=None): data = pd.read_table(filename, sep=sep, header=header, names=heads) rp = data.pivot_table(columns=['sid'],index=['uid'],values=['rating'])#not generally... Q = rp.fillna(0) Q = Q.values W = Q >0.5 W[W == True] = 1 W[W == False] = 0 W = W.astype(np.float64, copy=False) return Q, W, rp def get_error(Q, X, Y, W): return np.sum((W * (Q - np.dot(X, Y)))**2) ''' X[u] = np.linalg.solve(np.dot(, np.dot(np.diag(), .T)) + * np.eye(), np.dot(, np.dot(np.diag(), Q[u].T))).T ''' def ALS_X(Y, Wu, Q, lambda_, n_factors, u): return np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), YT)) + lambda_ * np.eye(n_factors), np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T ''' Y[:,i] = np.linalg.solve(np.dot(XT, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), np.dot(XT, np.dot(np.diag(Wi), Q[:, i]))) ''' def ALS_Y(X, Wi, Q, lambda_, n_factors, i): return np.linalg.solve(np.dot(XT, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), np.dot(XT, np.dot(np.diag(Wi), Q[:, i]))) if __name__ == "__main__": lambda_ = 0.1 n_factors = 40 filename = 'data_songID' n_iterations = 20 #Q, W, rp = load_data(['uid', 'sid', 'rating'], filename, ',') Q = np.random.rand(1000,1000) m, n = Q.shape W = np.eye(1000) print 'Loading data finished, ', 'size: ', Q.shape print 'Settings ', 'lambda = {}'.format(lambda_), 'n_factors = {}'.format(n_factors) X = 5 * np.random.rand(m, n_factors) Y = 5 * np.random.rand(n_factors, n) errors = [] for ii in range(n_iterations): X = np.linalg.solve(np.dot(Y, YT) + lambda_ * np.eye(n_factors), np.dot(Y, QT)).T Y = np.linalg.solve(np.dot(XT, X) + lambda_ * np.eye(n_factors), np.dot(XT, Q)) if ii % 100 == 0: print('{}th iteration is completed'.format(ii)) errors.append(get_error(Q, X, Y, W)) Q_hat = np.dot(X, Y) print('Error of rated movies: {}'.format(get_error(Q, X, Y, W))) print errors #####ALS start....##### print '*'*100 weighted_errors = [] for ii in range(n_iterations): pool = multiprocessing.Pool(processes=12) result = [] idx = [] for u, Wu in enumerate(W): idx.append(u) result.append(pool.apply_async(ALS_X, (Y, Wu, Q, lambda_, n_factors, u,))) pool.close() pool.join() for u, vector in zip(idx, result): X[u] = vector.get() ###################################### pool = multiprocessing.Pool(processes=12) result = [] idx = [] for i, Wi in enumerate(WT): idx.append(i) result.append(pool.apply_async(ALS_Y, (X, Wi, Q, lambda_, n_factors, i,))) pool.close() pool.join() for i, vector in zip(idx, result): Y[:,i] = vector.get() error = get_error(Q, X, Y, W) weighted_errors.append(error) print '{}th iteration is completed'.format(ii), 'error: ',error weighted_Q_hat = np.dot(X,Y) print weighted_errors X.tofile('X.bin') Y.tofile('Y.bin') latent_user_file = open('user_latent','w') for idx in len(rp.axes[0]): latent_user_file.write(str(rp.axes[0][idx]) + '\t' + vec2str(X[idx,:]) + '\n') latent_mid_file = open('mid_latent', 'w') for idx in len(rp.axes[1]): latent_mid_file.write(str(rp.axes[1][idx]) + '\t' + vec2str(YT[idx,:]) + '\n') 
+5
source share
1 answer

Last year, I came across your desire for a "parallel loop" in python and hacked it as part of my work with physics. There are many modules that do what you want, but I found that I really can make it work with pp the way I wanted for arbitrary functions.

If you want something similar to this:

 ResultList = Library_ParallelLoop.Main( Function = ExampleFunction, ListOfArgSets = ListOfArgSets, Algorithm = 'pp', PrintExtra = True ) 

Then I point you to my git concentrator instead of providing my entire source in this post, since the implementation, in order to actually make it work, caused a lot of lines pain and included deep python copy functions, which apparently was what something else that was not pre-built on python.

Search for simple examples:

https://github.com/douglasquincyadams/Main/blob/master/Test_ParallelLoop.py

Repo:

https://github.com/douglasquincyadams/Main

If you download your repo in some dark corner of your computer, then your working fragment should be:

 import Library_ParallelLoop def do_the_thing_function(ii): for u, Wu in enumerate(W): X[u] = np.linalg.solve(np.dot(Y, np.dot(np.diag(Wu), YT)) + lambda_ * np.eye(n_factors), np.dot(Y, np.dot(np.diag(Wu), Q[u].T))).T #X_inner loop for i, Wi in enumerate(WT): Y[:,i] = np.linalg.solve(np.dot(XT, np.dot(np.diag(Wi), X)) + lambda_ * np.eye(n_factors), #Y_inner loop np.dot(XT, np.dot(np.diag(Wi), Q[:, i])))#Y_inner loop error = get_error(Q, X, Y, W) weighted_errors.append(error) print '{}th iteration is completed'.format(ii) return #whatever your result is supposed to be... your code doesn't work on its own ListOfArgSets = [] for ii in range(n_iterations): ListOfArgSets.append( { "ii" : ii , } ) ResultList = Library_ParallelLoop.Main( Function = do_the_thing_function, ListOfArgSets = ListOfArgSets, Algorithm = 'pp', PrintExtra = True ) 

If you asked me, a parallel loop, very similar to the one given above, should already be something convenient and built into the languages, but it always seems that it is somehow mysteriously studied by the masters in the tower and doesn't quite work, when you try it on your shitty laptop. In any case, I hope this helps.

Additional note . I would also suggest that if you want to solve the problem of large-scale parallelization ARBITRARY (something more than simple loops), you use MPI because it has all kinds of bells and whistles that can allow processes to talk to each other in the middle. MPI is what science likes to use for the largest simulations, so larger clusters designed to handle very large tasks (~ 10k + core) support MPI and are unlikely to support pp or even a multiprocessor module. If you just want to use all the cores on your PC (or several computers on the network), then just choose the easiest way to get the job done.

+1
source

All Articles