A bag that doesn't use all the cores? alternatives?

I have a python script that does the following: i. which accepts an input data file (usually a nested JSON format) II. passes data line by line to another function that manipulates data in the desired format III. and finally it writes the output to a file.

Here is my current simple python string that does this ...

def manipulate(line): # a pure python function which transforms the data # ... return manipulated_json for line in f: components.append(manipulate(ujson.loads(line))) write_to_csv(components)` 

This works, but with the Python GIL limiting it to a single core on the server, it is very slow, especially with large amounts of data.

The amount of data that I usually encounter is about 4 gigs compressed by gzip, but sometimes I have to process data that is hundreds of gigs compressed by gzip. This is not big data, but it still cannot be processed in memory, and with Python GIL is very slow to process.

When looking for a solution to optimize data processing, I came across dask. Although PySpark seemed to be the clear solution for me at the time, promises dask, and this simplicity defeated me, and I decided to give it a try.

After much research on dask and how to use it, I put together a very small script to replicate my current process. The script looks like this:

 import dask.bag as bag import json bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')` 

This works and gives the same results as the original non-dask script, but it still uses only one processor per server. So this did not help. This is actually slower.

What am I doing wrong? Am I missing something? I'm still pretty new to dask, so let me know if I missed something, or if I will be doing something completely different.

Also, are there any dask alternatives for using the full server capacity (i.e. all processors) for what I need?

Thanks,

T

+6
source share
3 answers

The problem here is dask.dataframe.to_csv , which makes you work in single-core mode.

I recommend using dask.bag for reading and manipulating, and then dumping to multiple CSV files in parallel. Dumping into many CSV files is much easier to reconcile than dumping into a single CSV file.

 import dask.bag as bag import json b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat() b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute() 

There may also be a problem trying to read one GZIP file in parallel, but above you should start.

+2
source

The bags seem to be only parallel to the number of sections that they have.

For me launch

 mybag=bag.from_filenames(filename, chunkbytes=1e7) mybag.npartitions 

gives

one thousand seven hundred and forty-six

who solved the problem and completely processed the processing.

0
source

If you provided a chalet-based file name, for example. MyFiles-*.csv Csv in dask dataframe.to_csv() you should be able to output data to disk. It will create multiple files instead of 1 large csv file. See this topic for mre https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

 MyFiles-0001.csv MyFiles-0002.csv .... 
0
source

All Articles