Parallel to Julia's large persistent data structure

I have a large vector of line vectors: There are about 50,000 line vectors, each of which contains 2-15 lines, 1-20 characters long.

MyScoringOperation is a function that works with a row vector (database) and returns an array of 10,100 points (like Float64s). MyScoringOperation requires an average of 0.01 seconds to start (depending on the length of the binding)

 function MyScoringOperation(state:State, datum::Vector{String}) ... score::Vector{Float64} #Size of score = 10000 

I have what constitutes a nested loop. The outer loop will usually run for 500 iterations

 data::Vector{Vector{String}} = loaddata() for ii in 1:500 score_total = zeros(10100) for datum in data score_total+=MyScoringOperation(datum) end end 

On one computer, on a small test example of 3000 (and not 50,000), it takes 100-300 seconds per external cycle.

I have 3 powerful servers with Julia 3.9 installed (and you can get 3 more easily, and then you can get hundreds more on the next scale).


I have basic experience with @parallel, however it seems to spend a lot of time copying the constant (it more or less hangs on a smaller test case)

It looks like this:

 data::Vector{Vector{String}} = loaddata() state = init_state() for ii in 1:500 score_total = @parallel(+) for datum in data MyScoringOperation(state, datum) end state = update(state, score_total) end 

My understanding of how this implementation works with @parallel is that it:

For each ii :

  • sections of data in the cartridge for each worker
  • sends this cartridge to each employee
  • works, all processes there pieces
  • The main procedure summarizes the results as they become available.

I would like to delete step 2, so instead of sending part of the data to each employee, I just send a series of indexes to each employee and they look at it from their copy of data . or even better, only providing each with their own piece and reusing it each time (saving on a lot of RAM).


Profiling confirms my belief in the functioning of @parellel. For a task with a similar area (with even less data), the non-parallel version works in 0.09 seconds, and parallel work in AND profiler shows that almost all the time is spent for 185 seconds. The profiler shows that almost 100% of this is spent on interacting with the IO network.

+4
performance parallel-processing hpc julia-lang
source share
1 answer

This should help you:

 function get_chunks(data::Vector, nchunks::Int) base_len, remainder = divrem(length(data),nchunks) chunk_len = fill(base_len,nchunks) chunk_len[1:remainder]+=1 #remained will always be less than nchunks function _it() for ii in 1:nchunks chunk_start = sum(chunk_len[1:ii-1])+1 chunk_end = chunk_start + chunk_len[ii] -1 chunk = data[chunk_start: chunk_end] produce(chunk) end end Task(_it) end function r_chunk_data(data::Vector) all_chuncks = get_chunks(data, nworkers()) |> collect; remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())] #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue end function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef}) total = nothing #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe @sync for rr in rem_results function gather(rr) res=fetch(rr) if total===nothing total=res else total=red_acc(total,res) end end @async gather(rr) end total end function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function) rem_results = map(r_chunks) do rchunk function do_mapred() @assert r_chunk.where==myid() @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _) end remotecall(r_chunk.where,do_mapred) end @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _) end 

rchunk_data breaks the data into pieces (determined by the get_chunks method) and sends these pieces to every other employee, where they are stored in RemoteRefs. RemoteRefs are memory links on your other resources (and possibly computers) that

prechunked_map_reduce change the variation on the map view so that each employee first map_fun on each of them and then reduces all the elements in his cartridge using red_acc (function of the reduction battery). Finally, each worker returns a result that is then combined, bringing them all together together using red_acc this time using fetch_reduce so that we can add the first completed ones first.

fetch_reduce is a non-blocking extraction and reduction operation. I believe he has no race, although this is possible due to implementation details in @async and @sync . When julia 0.4 appears, it’s easy enough to install a castle so that it obviously does not have race conditions.

This code is not very fierce. I don’t believe. You can also see how to adjust the size of the cartridge so that you can see more data for faster workers (if some have a more efficient network or a faster processor)

You need to re-output the code as a problem with reducing the map, which does not look too heavy.


Testing:

 data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb chunk_data(:data, data) @time prechunked_mapreduce(:data, mean, (+)) 

Took ~ 0.03 seconds when distributed between 8 workers (not one of them on the same computer as the launcher)

vs runs only locally:

 @time reduce(+,map(mean,data)) 

took ~ 0.06 seconds.

+4
source share

All Articles