You can iterate over pieces of a file using
for chunk in zip(*[f]*chunksize):
(This is a grouper recipe recipe application that collects elements from the iterator f into chunksize sized chunksize . Note: This does not destroy the entire file at once, since zip returns the iterator in Python3.)
import concurrent.futures as CF import itertools as IT import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') def worker(line): line = line.strip() logger.info(line) chunksize = 1024 with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f: for chunk in zip(*[f]*chunksize): futures = [executor.submit(worker, line) for line in chunk]
Now, in the comments, you rightly point out that this is not optimal. There may be some kind of worker who takes a lot of time and holds a whole piece of jobs.
Usually, if each call to the employee takes approximately the same amount of time, then this does not matter much. However, here is a way to push file descriptions on demand. It uses threading.Condition to notify a sprinkler to advance the file descriptor.
import logging import threading import Queue logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') SENTINEL = object() def worker(cond, queue): for line in iter(queue.get, SENTINEL): line = line.strip() logger.info(line) with cond: cond.notify() logger.info('notify') def sprinkler(cond, queue, num_workers): with open("big_file") as f: for line in f: logger.info('advancing filehandle') with cond: queue.put(line) logger.info('waiting') cond.wait() for _ in range(num_workers): queue.put(SENTINEL) num_workers = 4 cond = threading.Condition() queue = Queue.Queue() t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers]) t.start() threads = [threading.Thread(target=worker, args=[cond, queue])] for t in threads: t.start() for t in threads: t.join()
unutbu
source share