Download more hdf5 file memory in pyspark

I have a large file (say 20 GB) stored in HDF5 format. A file is a set of three-dimensional coordinates that evolve over time (molecular modeling trajectory). This is basically an array of forms(8000 (frames), 50000 (particles), 3 (coordinates))

In regular python, I simply downloaded the hdf5 data file using for h5pyor pytables, and would index the data file as if it were numpy (the library lazily loads any data it needs).

However, if I try to load this file into Spark with SparkContext.parallelize, it obviously clogs the memory:

sc.parallelize(data, 10)

How can I deal with this problem? Is there a preferred data format for huge arrays? Is it possible to write rdd to disk without transferring from memory?

+4
source share
1 answer

Spark (and Hadoop) does not support reading portions of HDF5 binaries. (I suspect the reason for this is that HDF5 is a container format for storing documents and allows you to specify a tree as a hierarchy for documents).

But if you need to read a file from a local disk - this is possible with Spark, especially if you know the internal structure of your HDF5 file.

Here's an example - it assumes you run a local spark, and you know in advance that your HDF5 '/ mydata' dataset consists of 100 pieces.

h5file_path="/absolute/path/to/file"

def readchunk(v):
    empty = h5.File(h5file_path)
    return empty['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()

Next, you can change the program to determine the number of fragments using f5['/mydata'].shape[0]

The next step is to iterate over multiple datasets (you can list the datasets with f5.keys()).

" HDF5 Apache Spark RDD" , .

, . h5py , . , : ; HDFS HDFS fusefs - . , .

, h5 :

h5file_path="/absolute/path/to/file"

_h5file = None    
def readchunk(v):
    # code below will be executed on executor - in another python process on remote server
    # original value for _h5file (None) is sent from driver
    # and on executor is updated to h5.File object when the `readchunk` is called for the first time
    global _h5file
    if _h5file is None:
         _h5file = h5.File(h5file_path)
    return _h5file['/mydata'][v,:]

foo = sc.parallelize(range(0,100)).map(lambda v: readchunk(v))
foo.count()
+4

All Articles