In a โone time, read a lotโ workflow, I often parse large text files (20GB-60GB) dumped from Teradata using the FastExport utility and upload them to Pytables using Pandas. I use multiprocessing to split text files and distribute them into different processes to write sections of .H5 files based on line counting of 5MM each to support parallel writing. It is pretty fast for about 12 minutes to write multiple hdf5 files in parallel, compared to two 22 minutes to write one hdf5 file for 25 mm rows x 64 columns.
%timeit -n 1 write_single_hdf_multiprocess() 1 loops, best of 3: 22min 42s per loop %timeit -n 1 write_multiple_hdf_multiprocess() 1 loops, best of 3: 12min 12s per loop
In case of writing several h5 files, divided into lines, I have several files with the same structure that I want to merge into one h5file root / data / table
To test merge functionality, here is a snippet of code:
import tables as tb import pandas as pd tb.setBloscMaxThreads(15) store =pd.HDFStore('temp15.h5',complib='blosc') filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5'] for f in filenames: s=pd.HDFStore(f) df=s.select('data') store.append(key='data',value=df,format='t',chunksize=200000) store.close()
Here is the% timeit result for this:
1 loops, best of 3: 8min 22s per loop
Basically, I like it more when I write several h5 files in parallel. I have two questions:
Is there a way to combine (add) h5 files with the same table format more efficiently? (as a function of SQL Union). I tried this SO but could not get it to add tables.
If not, it splits into rows, what is wise to do when most queries choose where for all columns? I am thinking of writing a map / comb function that will look in all parts of the table to select from queries. Pandas function select_as_multiple () does this for column-based partitioning.
Update based on Jeff's suggestions:
A big challenge when deleting indexing and compression in the process of writing a file with preliminary consolidation. After removing indexing, compression, and setting the maximum number of lines in a file with preliminary 1MM line joining:
%timeit -n 1 write_multiple_hdf_multiprocess() 1 loops, best of 3: 9min 37s per loop
This is slightly more than 2 minutes faster than before, and almost as fast as I can analyze the data. After setting the data columns to the required fields (3 in my case):
for f in filenames: s=pd.HDFStore(f) df=s.select('data') dc=df.columns[1:4] store.append(key='data',value=df,format='t',data_columns=dc)
This is about 2 minutes slower than before: 1 loops, best of 3: 10min 23s per loop . After removing the compression from the above code, I get 1 loops, best of 3: 8min 48s per loop (almost identical to the first attempt with compression and the index of the data column). To give you an idea of โโhow well compression works, the uncompressed store is about 13.5 GB and the compressed version using blosc is about 3.7 GB.
So my process takes 18 minutes 15 seconds to create a merged uncompressed hdf5 file. This is compared to writing a single file (compressed) about 4 minutes 7 seconds faster.
This brings me to the second part of my questions: what if I do not merge the files and use the pre-merge files that will be processed in the map / comb way, can this be a reasonable way to approach this? how should i think about it?
For full disclosure, I use Pandas version 0.12.0 , version Pytables 3.0.0 and my data processing workflow looks like this (pseudocode):
def generate_chunks_from_text_file(reader,chunksize=50000): """ generator that yields processed text chunks """ for i, line in enumerate(reader.readlines()): ----process data and yield chunk ----- def data_reader(reader,queue): """ read data from file and put it into a queue for multiprocessing """ for chunk in self.generate_chunks_from_text_file(reader): queue.put(chunk)