How to send a function to another thread?

I am trying to write a simpler unit test runner for my Rust project. I created the TestFixture property, which implements my test builds, similar to inheriting from the base unit test class in other testing structures. This feature is pretty simple. This is my test fixture.

pub trait TestFixture { fn setup(&mut self) -> () {} fn teardown(&mut self) -> () {} fn before_each(&mut self) -> () {} fn after_each(&mut self) -> () {} fn tests(&mut self) -> Vec<Box<Fn(&mut Self)>> where Self: Sized { Vec::new() } } 

My test function works as follows

 pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) { fixture.setup(); let _r = fixture.tests().iter().map(|t| { let handle = thread::spawn(move || { fixture.before_each(); t(fixture); fixture.after_each(); }); if let Err(_) = handle.join() { println!("Test failed!") } }); fixture.teardown(); } 

I get an error

 src/tests.rs:73:22: 73:35 error: the trait `core::marker::Send` is not implemented for the type `T` [E0277] src/tests.rs:73 let handle = thread::spawn(move || { ^~~~~~~~~~~~~ note: in expansion of closure expansion src/tests.rs:69:41: 84:6 note: expansion site src/tests.rs:73:22: 73:35 note: `T` cannot be sent between threads safely src/tests.rs:73 let handle = thread::spawn(move || { ^~~~~~~~~~~~~ note: in expansion of closure expansion src/tests.rs:69:41: 84:6 note: expansion site src/tests.rs:73:22: 73:35 error: the trait `core::marker::Sync` is not implemented for the type `for<'r> core::ops::Fn(&'r mut T)` [E0277] src/tests.rs:73 let handle = thread::spawn(move || { ^~~~~~~~~~~~~ note: in expansion of closure expansion src/tests.rs:69:41: 84:6 note: expansion site src/tests.rs:73:22: 73:35 note: `for<'r> core::ops::Fn(&'r mut T)` cannot be shared between threads safely src/tests.rs:73 let handle = thread::spawn(move || { ^~~~~~~~~~~~~ note: in expansion of closure expansion 

I tried adding Arcs around types that are sent to the stream, without bone and the same error.

 pub fn test_fixture_runner<T: TestFixture>(fixture: &mut T) { fixture.setup(); let fix_arc = Arc::new(Mutex::new(fixture)); let _r = fixture.tests().iter().map(|t| { let test_arc = Arc::new(Mutex::new(t)); let fix_arc_clone = fix_arc.clone(); let test_arc_clone = test_arc.clone(); let handle = thread::spawn(move || { let thread_test = test_arc_clone.lock().unwrap(); let thread_fix = fix_arc_clone.lock().unwrap(); (*thread_fix).before_each(); (*thread_test)(*thread_fix); (*thread_fix).after_each(); }); if let Err(_) = handle.join() { println!("Test failed!") } }); fixture.teardown(); } 

An approximate test fixture would be something like

 struct BuiltinTests { pwd: PathBuf } impl TestFixture for BuiltinTests { fn setup(&mut self) { let mut pwd = env::temp_dir(); pwd.push("pwd"); fs::create_dir(&pwd); self.pwd = pwd; } fn teardown(&mut self) { fs::remove_dir(&self.pwd); } fn tests(&mut self) -> Vec<Box<Fn(&mut BuiltinTests)>> { vec![Box::new(BuiltinTests::cd_with_no_args)] } } impl BuiltinTests { fn new() -> BuiltinTests { BuiltinTests { pwd: PathBuf::new() } } } fn cd_with_no_args(&mut self) { let home = String::from("/"); env::set_var("HOME", &home); let mut cd = Cd::new(); cd.run(&[]); assert_eq!(env::var("PWD"), Ok(home)); } #[test] fn cd_tests() { let mut builtin_tests = BuiltinTests::new(); test_fixture_runner(&mut builtin_tests); } 

My intention is to use threads - this is isolation from the test runner. If the test does not match the statement, it causes a panic that kills the runner. Thanks for any insight, I am ready to change my design if this fixes the problem of panic.

+5
source share
2 answers

There are several problems with your code, I will show you how to fix them one by one.

The first problem is that you are using map() to iterate over the iterator. It will not work correctly because map() lazy - if you do not use an iterator, the closure you passed to it will not work. The correct way is to use a for loop:

 for t in fixture().tests().iter() { 

Secondly, you repeat the closure vector by reference:

 fixture.tests().iter().map(|t| { 

iter() on a Vec<T> returns an iterator inferior to elements of type &T , so your t will be of type &Box<Fn(&mut Self)> . However, Box<Fn(&mut T)> does not implement Sync by default (it is an attribute object that does not have information about the base type, except that you have specified it explicitly), therefore &Box<Fn(&mut T)> cannot be used for multiple threads. This is the second mistake you see about.

Most likely, you do not want to use these locks by reference; you probably want to completely move them to the generated thread. For this you need to use into_iter() instead of iter() :

 for t in fixture.tests().into_iter() { 

Now t will be of type Box<Fn(&mut T)> . However, it still cannot be streamed. Again, this is a sign object, and the compiler does not know if there is Send inside it. To do this, you need to add Send associated with the closure type:

 fn tests(&mut self) -> Vec<Box<Fn(&mut Self)+Send>> 

Now the error in Fn missing.

The last error about Send not implemented for t . We need to add a Send estimate to t :

 pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) { 

And now the error becomes more clear:

 test.rs:18:22: 18:35 error: captured variable `fixture` does not outlive the enclosing closure test.rs:18 let handle = thread::spawn(move || { ^~~~~~~~~~~~~ note: in expansion of closure expansion test.rs:18:5: 28:6 note: expansion site test.rs:15:66: 31:2 note: captured variable is valid for the anonymous lifetime #1 defined on the block at 15:65 test.rs:15 pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) { test.rs:16 fixture.setup(); test.rs:17 test.rs:18 for t in fixture.tests().into_iter() { test.rs:19 let handle = thread::spawn(move || { test.rs:20 fixture.before_each(); ... note: closure is valid for the static lifetime 

This error occurs because you are trying to use a link in the spawn() ed stream. spawn() requires its close argument to have a 'static binding, i.e. its captured environment should not contain links with non-t230> lifetimes. But what exactly happens here - &mut T not 'static . spawn() design does not prohibit escaping connections, so it is explicitly written to prohibit the transfer of non-t230> links to the generated stream.

Note that when using &mut T this error is unavoidable even if you put &mut T in Arc , because then the lifetime of &mut T will be “saved” in Arc , and therefore Arc<Mutex<&mut T>> also not be 'static .

There are two ways to do what you want.

First, you can use the unstable thread::scoped() API. This is unstable because it is shown to provide secure memory in secure code, and the plan should provide some replacement for it in the future. Nevertheless, you can use it in night rust (this will not lead to insecurity of the memory yourself, only in specially created situations):

 pub fn test_fixture_runner<T: TestFixture+Send>(fixture: &mut T) { fixture.setup(); let tests = fixture.lock().unwrap().tests(); for t in tests.into_iter() { let f = &mut *fixture; let handle = thread::scoped(move || { f.before_each(); t(f); f.after_each(); }); handle.join(); } fixture.teardown(); } 

This code compiles because scoped() written in such a way that it guarantees (in most cases) that the stream will not survive all captured links. I had to fixture because otherwise (since &mut links are not copied), it will be moved to the stream, and fixture.teardown() will be denied. I also had to extract the tests variable, because otherwise the mutex would be blocked by the main thread during the for loop, which, of course, would forbid it to be blocked in child threads.

However, with scoped() you cannot isolate panic in a child thread. If the child thread causes a panic, that panic will be reset from the join() call. This may or may not be a problem in general, but I think this is a problem for your code.

Another way is to reorganize your code to hold the fixture in Arc<Mutex<..>> from the very beginning:

 pub fn test_fixture_runner<T: TestFixture + Send + 'static>(fixture: Arc<Mutex<T>>) { fixture.lock().unwrap().setup(); for t in fixture.lock().unwrap().tests().into_iter() { let fixture = fixture.clone(); let handle = thread::spawn(move || { let mut fixture = fixture.lock().unwrap(); fixture.before_each(); t(&mut *fixture); fixture.after_each(); }); if let Err(_) = handle.join() { println!("Test failed!") } } fixture.lock().unwrap().teardown(); } 

Note that now t should also be 'static , because otherwise it cannot be used with thread::spawn() , since it requires 'static . fixture inside the inner closure is not &mut T , but a MutexGuard<T> , and therefore it must be explicitly converted to &mut T in order to pass it to t .

This may seem overly and unnecessarily complicated, but this programming language design prevents you from making many mistakes in multi-threaded programming. Each of the above errors that we saw is valid - each of them would be a potential cause of insecurity of the memory or data race if it were ignored.

+7
source

As stated in the Rust HandBook Concurrency section:

When a type T implements Send, it tells the compiler that one of this type can have safe movement between threads.

If you do not implement Send, ownership is not transferred between threads.

0
source

All Articles