I have a fairly large learning matrix (more than 1 billion rows, two functions for each row). There are two classes (0 and 1). This is too large for a single machine, but, fortunately, I have about 200 MPI hosts at my disposal. Each of them is a modest dual-core workstation.
Function generation has already been successfully distributed.
In Multiprocessing scikit-learn answers, you can spread the work of SGDClassifier:
You can distribute datasets across the kernels, do partial_fit, get weight vectors, average them, distribute them by estimation methods, do a partial fit again.
When I ran partial_fit a second time on each assessment, where can I go from there to get the final aggregate estimate?
My best guess was to average the odds and intercepts again and make an estimate with these values. The resulting estimate gives a different result than the estimate constructed with fit () for all the data.
More details
Each host generates a local matrix and a local vector. These are n lines of a test set and corresponding n target values.
Each host uses a local matrix and local vector to create an SGDClassifier and performs a partial fit. Then each one sends the coef vector and the interception to the root. Roots average these values โโand send them back to the hosts. Hosts do another partial_fit and send the coef vector and interception to root.
The root is building a new estimate with these values.
local_matrix = get_local_matrix() local_vector = get_local_vector() estimator = linear_model.SGDClassifier() estimator.partial_fit(local_matrix, local_vector, [0,1]) comm.send((estimator.coef_,estimator.intersept_),dest=0,tag=rank) average_coefs = None avg_intercept = None comm.bcast(0,root=0) if rank > 0: comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank) else: pairs = [comm.recv(source=r, tag=r) for r in range(1,size)] pairs.append( (estimator.coef_, estimator.intercept_) ) average_coefs = np.average([ a[0] for a in pairs ],axis=0) avg_intercept = np.average( [ a[1][0] for a in pairs ] ) estimator.coef_ = comm.bcast(average_coefs,root=0) estimator.intercept_ = np.array( [comm.bcast(avg_intercept,root=0)] ) estimator.partial_fit(metric_matrix, edges_exist,[0,1]) if rank > 0: comm.send( (estimator.coef_, estimator.intercept_ ), dest=0, tag=rank) else: pairs = [comm.recv(source=r, tag=r) for r in range(1,size)] pairs.append( (estimator.coef_, estimator.intercept_) ) average_coefs = np.average([ a[0] for a in pairs ],axis=0) avg_intercept = np.average( [ a[1][0] for a in pairs ] ) estimator.coef_ = average_coefs estimator.intercept_ = np.array( [avg_intercept] ) print("The estimator at rank 0 should now be working")
Thanks!