I read data from large CSV files, process it and load it into a SQLite database. Profiling suggests that 80% of my time is spent on I / O, and 20% processes input to prepare it for inserting the database. I accelerated the processing step with multiprocessing.Pool so that the I / O code never expected the next record. But this caused serious memory problems because the I / O step could not keep up with the workers.
The following toy example illustrates my problem:
#!/usr/bin/env python # 3.4.3 import time from multiprocessing import Pool def records(num=100): """Simulate generator getting data from large CSV files.""" for i in range(num): print('Reading record {0}'.format(i)) time.sleep(0.05) # getting raw data is fast yield i def process(rec): """Simulate processing of raw text into dicts.""" print('Processing {0}'.format(rec)) time.sleep(0.1) # processing takes a little time return rec def writer(records): """Simulate saving data to SQLite database.""" for r in records: time.sleep(0.3) # writing takes the longest print('Wrote {0}'.format(r)) if __name__ == "__main__": data = records(100) with Pool(2) as pool: writer(pool.imap_unordered(process, data, chunksize=5))
This code lags the records, which ultimately consume all the memory, because I cannot quickly save the data to disk. Run the code and you will notice that Pool.imap_unordered will consume all the data when the writer is on the 15th record or so. Now imagine that at the processing stage dictionaries of hundreds of millions of lines are produced, and you can understand why my memory is running out. Amdahl Law in action is possible.
What is the fix for this? I think I need some kind of buffer for Pool.imap_unordered , which reads: "When there are x-records that require insertion, they stop and wait until there are less than x before creating more." I should be able to get some speed improvement from preparing the next record while the last is saved.
I tried using NuMap from the papy module (which I modified to work with Python 3) to do just that, but it was not faster. In fact, it was worse than running the program sequentially; NuMap uses two threads plus several processes.
The bulk import functions of SQLite are probably not suitable for my task, because the data requires significant processing and normalization.
I have about 85G of compressed text to process. I am open to other database technologies, but I chose SQLite for ease of use and because it is a one-time read-multitask task in which only 3 or 4 people will use the resulting database after everything is loaded.