How can we use tqdm in parallel execution with joblib?

I want to run a function in parallel and wait until all parallel nodes are executed using joblib. As in the example:

from math import sqrt from joblib import Parallel, delayed Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) 

But I want the execution to be visible in one step of progress, for example, with tqdm , showing how many tasks were completed.

How do you do this?

+21
python parallel-processing tqdm joblib
source share
3 answers

If your task consists of many parts, you can divide these parts into subgroups k , run each subgroup in parallel and update the progress bar between them, which will lead to k updating progress.

This is demonstrated in the following example from the documentation.

 >>> with Parallel(n_jobs=2) as parallel: ... accumulator = 0. ... n_iter = 0 ... while accumulator < 1000: ... results = parallel(delayed(sqrt)(accumulator + i ** 2) ... for i in range(5)) ... accumulator += sum(results) # synchronization barrier ... n_iter += 1 

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

+7
source share

Just put range(10) inside tqdm(...) ! It probably seemed too good to be true for you, but it really works (on my machine):

 from math import sqrt from joblib import Parallel, delayed from tqdm import tqdm result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000))) 
+26
source share

A workaround is possible here

 def func(x): time.sleep(random.randint(1, 10)) return x def text_progessbar(seq, total=None): step = 1 tick = time.time() while True: time_diff = time.time()-tick avg_speed = time_diff/step total_str = 'of %n' % total if total else '' print('step', step, '%.2f' % time_diff, 'avg: %.2f iter/sec' % avg_speed, total_str) step += 1 yield next(seq) all_bar_funcs = { 'tqdm': lambda args: lambda x: tqdm(x, **args), 'txt': lambda args: lambda x: text_progessbar(x, **args), 'False': lambda args: iter, 'None': lambda args: iter, } def ParallelExecutor(use_bar='tqdm', **joblib_args): def aprun(bar=use_bar, **tq_args): def tmp(op_iter): if str(bar) in all_bar_funcs.keys(): bar_func = all_bar_funcs[str(bar)](tq_args) else: raise ValueError("Value %s not supported as bar type"%bar) return Parallel(**joblib_args)(bar_func(op_iter)) return tmp return aprun aprun = ParallelExecutor(n_jobs=5) a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5)) a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) 
+6
source share

All Articles