How to create the future only if a free worker is available

I am trying to send information extracted from the lines of a large file to a process running on some server.

To speed this up, I would like to do this with some threads in parallel.

Using fallback Python 2.7 concurrent.futures I tried this:

f = open("big_file") with ThreadPoolExecutor(max_workers=4) as e: for line in f: e.submit(send_line_function, line) f.close() 

However, this is problematic because all futures are transferred instantly, so my machine runs out of memory, because the full file is loaded into memory.

My question is, is there an easy way to imagine only a new future when a free worker is available.

+1
python multithreading concurrent.futures
source share
1 answer

You can iterate over pieces of a file using

 for chunk in zip(*[f]*chunksize): 

(This is a grouper recipe recipe application that collects elements from the iterator f into chunksize sized chunksize . Note: This does not destroy the entire file at once, since zip returns the iterator in Python3.)


 import concurrent.futures as CF import itertools as IT import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') def worker(line): line = line.strip() logger.info(line) chunksize = 1024 with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f: for chunk in zip(*[f]*chunksize): futures = [executor.submit(worker, line) for line in chunk] # wait for these futures to complete before processing another chunk CF.wait(futures) 

Now, in the comments, you rightly point out that this is not optimal. There may be some kind of worker who takes a lot of time and holds a whole piece of jobs.

Usually, if each call to the employee takes approximately the same amount of time, then this does not matter much. However, here is a way to push file descriptions on demand. It uses threading.Condition to notify a sprinkler to advance the file descriptor.

 import logging import threading import Queue logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') SENTINEL = object() def worker(cond, queue): for line in iter(queue.get, SENTINEL): line = line.strip() logger.info(line) with cond: cond.notify() logger.info('notify') def sprinkler(cond, queue, num_workers): with open("big_file") as f: for line in f: logger.info('advancing filehandle') with cond: queue.put(line) logger.info('waiting') cond.wait() for _ in range(num_workers): queue.put(SENTINEL) num_workers = 4 cond = threading.Condition() queue = Queue.Queue() t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers]) t.start() threads = [threading.Thread(target=worker, args=[cond, queue])] for t in threads: t.start() for t in threads: t.join() 
+1
source share

All Articles