How to correctly exit thread blocking on mpsc :: Receiver

impl A { fn new() -> (A, std::sync::mpsc::Receiver<Data>) { let (sender, receiver) = std::sync::mpsc::channel(); let objA = A { sender: sender, }; // A spawns threads, clones and uses sender etc (objA, receiver) } } impl B { fn new() -> B { let (objA, receiver) = A::new(); B { a: objA, join_handle: Some(std::thread::spwan(move || { loop { match receiver.recv() { Ok(data) => /* Do Something, inform main thread etc */, Err(_) => break, } } })), } } } impl Drop for B { fn drop(&mut self) { // Want to do something like "sender.close()/receiver.close()" etc so that the following // thread joins. But there is no such function. How do i break the following thread ? self.join_handle().take().unwrap().join().unwrap(); } } 

Is there a way to fail under such circumstances? The fact is that when the receiver or sender falls, the other sniffs it and gives an error. In the case of receiver it will be woken up and give an error, in which case I will break out of the infinite blocking cycle above. However, how can I explicitly use this property of channels without resorting to other flags in combination with try_recv() , etc., and cleanly exit my stream deterministically?

+4
source share
3 answers

Why not send a specific message to close this stream? I don’t know what your data is, but most of the time it can be an enumeration and adding an enumeration option, such as “MyData :: Shutdown” in your technique, you can just break out of the loop.

+2
source

You can wrap a field of your type B in Option . So in Drop::drop you can make drop(self.a.take()) , which will replace the None field and drop the sender. This closes the channel, and now your stream can be correctly connected.

+2
source

You can create a new channel and change your actual sender using a dummy sender. Then you can drop the sender and join the stream:

 impl Drop for B { fn drop(&mut self) { let (s, _) = channel(); drop(replace(&mut self.a.sender, s)); self.join_handle.take().unwrap().join().unwrap(); } } 

Try in the playpen: http://is.gd/y7A9L0

I do not know what the overhead of creating and immediately deleting a channel is, but it is not free and is unlikely to be optimized (there is Arc ).


on a side note, your infinite loop with a match on receiver.recv() can be replaced with a for loop using the Receiver::iter method:

 for _ in receiver.iter() { // do something with the value } 
+1
source

All Articles