Multiprocessing.Pool.imap_unordered with fixed size or queue buffer?

I read data from large CSV files, process it and load it into a SQLite database. Profiling suggests that 80% of my time is spent on I / O, and 20% processes input to prepare it for inserting the database. I accelerated the processing step with multiprocessing.Pool so that the I / O code never expected the next record. But this caused serious memory problems because the I / O step could not keep up with the workers.

The following toy example illustrates my problem:

 #!/usr/bin/env python # 3.4.3 import time from multiprocessing import Pool def records(num=100): """Simulate generator getting data from large CSV files.""" for i in range(num): print('Reading record {0}'.format(i)) time.sleep(0.05) # getting raw data is fast yield i def process(rec): """Simulate processing of raw text into dicts.""" print('Processing {0}'.format(rec)) time.sleep(0.1) # processing takes a little time return rec def writer(records): """Simulate saving data to SQLite database.""" for r in records: time.sleep(0.3) # writing takes the longest print('Wrote {0}'.format(r)) if __name__ == "__main__": data = records(100) with Pool(2) as pool: writer(pool.imap_unordered(process, data, chunksize=5)) 

This code lags the records, which ultimately consume all the memory, because I cannot quickly save the data to disk. Run the code and you will notice that Pool.imap_unordered will consume all the data when the writer is on the 15th record or so. Now imagine that at the processing stage dictionaries of hundreds of millions of lines are produced, and you can understand why my memory is running out. Amdahl Law in action is possible.

What is the fix for this? I think I need some kind of buffer for Pool.imap_unordered , which reads: "When there are x-records that require insertion, they stop and wait until there are less than x before creating more." I should be able to get some speed improvement from preparing the next record while the last is saved.

I tried using NuMap from the papy module (which I modified to work with Python 3) to do just that, but it was not faster. In fact, it was worse than running the program sequentially; NuMap uses two threads plus several processes.

The bulk import functions of SQLite are probably not suitable for my task, because the data requires significant processing and normalization.

I have about 85G of compressed text to process. I am open to other database technologies, but I chose SQLite for ease of use and because it is a one-time read-multitask task in which only 3 or 4 people will use the resulting database after everything is loaded.

+5
source share
4 answers

Since processing is fast, but recording is slow, it sounds like your problem is I / O-related. Consequently, from the use of multiprocessor.

However, you can separate the pieces of data , process the piece, and until this data is written before separating another fragment:

 import itertools as IT if __name__ == "__main__": data = records(100) with Pool(2) as pool: chunksize = ... for chunk in iter(lambda: list(IT.islice(data, chunksize)), []): writer(pool.imap_unordered(process, chunk, chunksize=5)) 
+2
source

Since I was working on the same problem, I decided that an efficient way to prevent pool overflow is to use a semaphore with a generator:

 from multiprocessing import Pool, Semaphore def produce(semaphore, from_file): with open(from_file) as reader: for line in reader: # Reduce Semaphore by 1 or wait if 0 semaphore.acquire() # Now deliver an item to the caller (pool) yield line def process(item): result = (first_function(item), second_function(item), third_function(item)) return result def consume(semaphore, result): database_con.cur.execute("INSERT INTO ResultTable VALUES (?,?,?)", result) # Result is consumed, semaphore may now be increased by 1 semaphore.release() def main() global database_con semaphore_1 = Semaphore(1024) with Pool(2) as pool: for result in pool.imap_unordered(process, produce(semaphore_1, "workfile.txt"), chunksize=128): consume(semaphore1, result) 

See also:

K Hong - multithreading - semaphore objects and thread pool

Lecture by Chris Terman - MIT 6.004 L21: Semaphores

+2
source

It seems that all you really need is to replace unlimited queues under Pool limited (and blocking) queues. Thus, if either side is ahead of the rest, it simply blocks until it is ready.

This would be easy to do by looking at the source , in a subclass or monkeypatch Pool , something like:

 class Pool(multiprocessing.pool.Pool): def _setup_queues(self): self._inqueue = self._ctx.Queue(5) self._outqueue = self._ctx.Queue(5) self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv self._taskqueue = queue.Queue(10) 

But this is clearly not portable (even for CPython 3.3, and even more so for another Python 3 implementation).

I think you can do it portable in 3.4+ by providing a custom context , but I was not able to get this right, so ...

0
source

Using the Semaphore approach from this answer , I created a wrapper for an iterator to limit the number of values ​​received.

https://pypi.python.org/pypi/bounded-iterator/

0
source

All Articles