Consume non-overlapping vector pieces and combine results

I am trying to speed up the costly computation on a large vector with threads. My function consumes a vector, computes a vector of new values ​​(it is not aggregated, but the input order must be preserved) and returns it. However, I'm struggling to figure out how to spawn threads, assign vector slices to each, and then collect and combine the results.

// tunable const NUMTHREADS: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose an odd number of elements let orig = (1..14).collect::<Vec<i32>>(); let mut result: Vec<Vec<i32>> = vec!(); let mut flat: Vec<i32> = Vec::with_capacity(orig.len()); // split into slices for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) { result.push( chunk.iter().map(|&digit| f(digit)).collect() ); }; // flatten result vector for subvec in result.iter() { for elem in subvec.iter() { flat.push(elem.to_owned()); } } println!("Flattened result: {:?}", flat); } 

Stream computing should happen between for chunk… and // flatten … , but I can't find many simple examples of spawning x threads, sequentially assigning pieces and returning the newly computed vector from the stream and to the container so that it can be flattened. Do I need to wrap orig.chunks() in Arc and manually capture each piece in a loop? Should I skip f in each thread? Should I use B-Tree to match input and output? Is it possible to use simple_parallel ?

+5
source share
1 answer

Well, this is the perfect application for unstable thread::scoped() :

 #![feature(scoped)] use std::thread::{self, JoinGuard}; // tunable const NUMTHREADS: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose an odd number of elements let orig: Vec<i32> = (1..14).collect(); let mut guards: Vec<JoinGuard<Vec<i32>>> = vec!(); // split into slices for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) { let g = thread::scoped(move || chunk.iter().cloned().map(f).collect()); guards.push(g); }; // collect the results let mut result: Vec<i32> = Vec::with_capacity(orig.len()); for g in guards { result.extend(g.join().into_iter()); } println!("Flattened result: {:?}", result); } 

It is unstable and is unlikely to stabilize in this form, because it has its inherent flaw (here you can find more here ). As far as I can see, simple_parallel is just an extension of this approach - it hides messing around with JoinGuards , and can also be used in stable Rust (maybe with some unsafe ty, I think). However, this is not recommended for general use, as its documents suggest.

Of course, you can use thread::spawn() , but then you will need to clone each piece so that it can move into each thread:

 use std::thread::{self, JoinHandle}; // tunable const NUMTHREADS: i32 = 4; fn f(val: i32) -> i32 { // expensive computation let res = val + 1; res } fn main() { // choose an odd number of elements let orig: Vec<i32> = (1..14).collect(); let mut guards: Vec<JoinHandle<Vec<i32>>> = vec!(); // split into slices for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) { let chunk = chunk.to_owned(); let g = thread::spawn(move || chunk.into_iter().map(f).collect()); guards.push(g); }; // collect the results let mut result: Vec<i32> = Vec::with_capacity(orig.len()); for g in guards { result.extend(g.join().unwrap().into_iter()); } println!("Flattened result: {:?}", result); } 
+2
source

All Articles