Track progress. Parallel execution

Is there an easy way to track the overall progress of joblib.Parallel ?

I have a long run, consisting of thousands of jobs that I want to track and write to the database. However, for this, when Parallel completes the task, I need it to execute a callback, reporting how many remaining jobs are left.

I performed a similar task before using Python stdlib multiprocessing.Pool by running a thread that writes the number of pending jobs to the pool's job list.

Looking at the code, Parallel inherits the pool, so I thought I could remove the same trick, but it doesn’t seem to use this list, and I couldn’t figure out how to “read” this internal status in any other way.

+8
python multithreading parallel-processing multiprocessing joblib
source share
3 answers

The documentation you are attached to indicates that Parallel has an additional progress bar. It is implemented using the callback keyword argument provided by multiprocessing.Pool.apply_async :

 # This is inside a dispatch function self._lock.acquire() job = self._pool.apply_async(SafeFunction(func), args, kwargs, callback=CallBack(self.n_dispatched, self)) self._jobs.append(job) self.n_dispatched += 1 

...

 class CallBack(object): """ Callback used by parallel: it is used for progress reporting, and to add data to be processed """ def __init__(self, index, parallel): self.parallel = parallel self.index = index def __call__(self, out): self.parallel.print_progress(self.index) if self.parallel._original_iterable: self.parallel.dispatch_next() 

And here print_progress :

 def print_progress(self, index): elapsed_time = time.time() - self._start_time # This is heuristic code to print only 'verbose' times a messages # The challenge is that we may not know the queue length if self._original_iterable: if _verbosity_filter(index, self.verbose): return self._print('Done %3i jobs | elapsed: %s', (index + 1, short_format_time(elapsed_time), )) else: # We are finished dispatching queue_length = self.n_dispatched # We always display the first loop if not index == 0: # Display depending on the number of remaining items # A message as soon as we finish dispatching, cursor is 0 cursor = (queue_length - index + 1 - self._pre_dispatch_amount) frequency = (queue_length // self.verbose) + 1 is_last_item = (index + 1 == queue_length) if (is_last_item or cursor % frequency): return remaining_time = (elapsed_time / (index + 1) * (self.n_dispatched - index - 1.)) self._print('Done %3i out of %3i | elapsed: %s remaining: %s', (index + 1, queue_length, short_format_time(elapsed_time), short_format_time(remaining_time), )) 

The way they implement this looks strange, to be honest - it seems, it is assumed that the tasks will always be completed in the order in which they will be launched. The index variable, which goes to print_progress , is only the self.n_dispatched variable at the time it started. Thus, the first running job always ends with index 0, even if, say, the third job is completed first. This also means that they do not actually track the amount of work performed. So there is no instance variable for you.

I think it is best to make your own CallBack class and the Parallel monkey patch:

 from math import sqrt from collections import defaultdict from joblib import Parallel, delayed class CallBack(object): completed = defaultdict(int) def __init__(self, index, parallel): self.index = index self.parallel = parallel def __call__(self, index): CallBack.completed[self.parallel] += 1 print("done with {}".format(CallBack.completed[self.parallel])) if self.parallel._original_iterable: self.parallel.dispatch_next() import joblib.parallel joblib.parallel.CallBack = CallBack if __name__ == "__main__": print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10))) 

Output:

 done with 1 done with 2 done with 3 done with 4 done with 5 done with 6 done with 7 done with 8 done with 9 done with 10 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 

This way, your callback is called whenever the job completes, and not by default.

+7
source share

Extension to dano answer for the newest version of joblib library. In the internal implementation, several changes have been made.

 from joblib import Parallel, delayed from collections import defaultdict # patch joblib progress callback class BatchCompletionCallBack(object): completed = defaultdict(int) def __init__(self, time, index, parallel): self.index = index self.parallel = parallel def __call__(self, index): BatchCompletionCallBack.completed[self.parallel] += 1 print("done with {}".format(BatchCompletionCallBack.completed[self.parallel])) if self.parallel._original_iterator is not None: self.parallel.dispatch_next() import joblib.parallel joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack 
+2
source share

Here is another answer to your question with the following syntax:

 aprun = ParallelExecutor(n_jobs=5) a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5)) a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) 

stack overflow

+1
source share

All Articles