Pytables / Pandas: merging (reading?) Mutliple HDF5 row-separated repositories

In a โ€œone time, read a lotโ€ workflow, I often parse large text files (20GB-60GB) dumped from Teradata using the FastExport utility and upload them to Pytables using Pandas. I use multiprocessing to split text files and distribute them into different processes to write sections of .H5 files based on line counting of 5MM each to support parallel writing. It is pretty fast for about 12 minutes to write multiple hdf5 files in parallel, compared to two 22 minutes to write one hdf5 file for 25 mm rows x 64 columns.

%timeit -n 1 write_single_hdf_multiprocess() 1 loops, best of 3: 22min 42s per loop %timeit -n 1 write_multiple_hdf_multiprocess() 1 loops, best of 3: 12min 12s per loop 

In case of writing several h5 files, divided into lines, I have several files with the same structure that I want to merge into one h5file root / data / table

To test merge functionality, here is a snippet of code:

 import tables as tb import pandas as pd tb.setBloscMaxThreads(15) store =pd.HDFStore('temp15.h5',complib='blosc') filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5'] for f in filenames: s=pd.HDFStore(f) df=s.select('data') store.append(key='data',value=df,format='t',chunksize=200000) store.close() 

Here is the% timeit result for this:

 1 loops, best of 3: 8min 22s per loop 

Basically, I like it more when I write several h5 files in parallel. I have two questions:

  • Is there a way to combine (add) h5 files with the same table format more efficiently? (as a function of SQL Union). I tried this SO but could not get it to add tables.

  • If not, it splits into rows, what is wise to do when most queries choose where for all columns? I am thinking of writing a map / comb function that will look in all parts of the table to select from queries. Pandas function select_as_multiple () does this for column-based partitioning.


Update based on Jeff's suggestions:

A big challenge when deleting indexing and compression in the process of writing a file with preliminary consolidation. After removing indexing, compression, and setting the maximum number of lines in a file with preliminary 1MM line joining:

 %timeit -n 1 write_multiple_hdf_multiprocess() 1 loops, best of 3: 9min 37s per loop 

This is slightly more than 2 minutes faster than before, and almost as fast as I can analyze the data. After setting the data columns to the required fields (3 in my case):

 for f in filenames: s=pd.HDFStore(f) df=s.select('data') dc=df.columns[1:4] store.append(key='data',value=df,format='t',data_columns=dc) 

This is about 2 minutes slower than before: 1 loops, best of 3: 10min 23s per loop . After removing the compression from the above code, I get 1 loops, best of 3: 8min 48s per loop (almost identical to the first attempt with compression and the index of the data column). To give you an idea of โ€‹โ€‹how well compression works, the uncompressed store is about 13.5 GB and the compressed version using blosc is about 3.7 GB.

So my process takes 18 minutes 15 seconds to create a merged uncompressed hdf5 file. This is compared to writing a single file (compressed) about 4 minutes 7 seconds faster.

This brings me to the second part of my questions: what if I do not merge the files and use the pre-merge files that will be processed in the map / comb way, can this be a reasonable way to approach this? how should i think about it?

For full disclosure, I use Pandas version 0.12.0 , version Pytables 3.0.0 and my data processing workflow looks like this (pseudocode):

 def generate_chunks_from_text_file(reader,chunksize=50000): """ generator that yields processed text chunks """ for i, line in enumerate(reader.readlines()): ----process data and yield chunk ----- def data_reader(reader,queue): """ read data from file and put it into a queue for multiprocessing """ for chunk in self.generate_chunks_from_text_file(reader): queue.put(chunk) # put data in the queue for the writer def data_processor(queue,filename,dtype,min_size): """" subprocess that reads the next value in the queue and writes hdf store. """ store=pd.HDFStore(filename) while True: results = queue.get() array=np.array(results,dtype=dt) # convert to numpy array df = pd.DataFrame(array) #covert to pandas array store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False) store.close() ----when queue exhausts - break----- 
+6
source share
1 answer

I make a very similar split-process-comb method, using several processes to create intermediate files, and then use one process to combine the resulting files. Here are some tips for improving productivity:

  • Disable indexing while writing files by going to index=False , see here for documents. I suggest that PyTables incrementally updates the index, which in this case is completely unnecessary (how are you going to merge them later). Index only the final file. This should speed up the recording quite a bit.

  • You might want to change the default default indexing scheme / level, depending on your queries (suppose you follow the recommendations a few points below so as NOT to create too many data columns).

  • In a similar vein, do not create a compressed file when writing pre-compiled files, but create it AFTER the indexed file is written (in an uncompressed state), so this ends up being the last step. See Docs here . In addition, it is very important to pass --chunkshape=auto when using ptrepack , which rearranged PyTables fragments (for example, how much data is read / written in one block), since it will take into account the entire table.

  • Compression of RE, YMMV may vary here, depending on how well your data is actually compressed and what requests you make. I have some data types that I find that it is faster NOT to compress at all, although in theory this should be better. You should just experiment (although I always use blosc ). Blosc has only one compression level (it is either turned on for levels 1-9 or off for level 0). Therefore, changing this will not change anything.

  • I merge files in indexed order, mainly by reading a subset of files with preliminary merging into memory (a constant number to use only a constant amount of memory), and then add them one at a time to the final file. (not 100% sure this matters, but seems to work well).

  • You will find that the vast majority of your time is spent creating index.

  • Also, index only those columns that you really need! specifying data_columns=a_small_subset_of_columns when writing each file.

  • I believe that writing a lot of small files is better and then combining to create a rather large file rather than writing a few large files, but here is YMMV. (for example, 100 to 100 MB pre-merge files to get a 10 GB file, not 5 2 GB files). Although this may be a function of my processing pipeline, as I am prone to processing bottlenecks rather than the actual recording.

  • I have not used, but I heard amazing things about using SSD (sold-state-drive), even if it is relatively small for this kind of thing. You can get the acceleration order with one (and compression can change this result).

+8
source

All Articles