Moving the receiver to the stream complains about synchronization, but sending is expected

I am trying to access through Arc, the receiver into the stream, so I can do a centralized pub-sub through the dispatcher. However, I get the following error:

src/dispatcher.rs:58:11: 58:24 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` [E0277]
src/dispatcher.rs:58           thread::spawn(move || {
                               ^~~~~~~~~~~~~
src/dispatcher.rs:58:11: 58:24 note: `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` cannot be shared between threads safely
src/dispatcher.rs:58           thread::spawn(move || {

Wat! I thought that to move through the channels is only required Send? Code DispatchMessage:

#[derive(PartialEq, Debug, Clone)]
enum DispatchType {
    ChangeCurrentChannel,
    OutgoingMessage,
    IncomingMessage
}

#[derive(Clone)]
struct DispatchMessage {
   dispatch_type: DispatchType,
   payload: String
}

Both Stringand of course Enumare equal Send, right? Why is he complaining about Sync?

Relevant part of dispatcher:

pub fn start(&self) {
   let shared_subscribers = Arc::new(self.subscribers);
   for ref broadcaster in &self.broadcasters {
      let shared_broadcaster = Arc::new(Mutex::new(broadcaster));

      let broadcaster = shared_broadcaster.clone();
      let subscribers = shared_subscribers.clone();
      thread::spawn(move || {
         loop {
            let message = &broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster");
            match subscribers.get(type_to_str(&message.dispatch_type)) {
              Some(ref subs) => { 
                  for sub in subs.iter() { sub.send(*message).unwrap(); }
              },
              None => ()
            }

         }
      });
   }
}

The full dispatcher code is in this context: https://gist.github.com/timonv/5cdc56bf671cee69d3fa

If this is still relevant, it will be built against 5-2-2015 at night.

+4
source share
2 answers

, , Mutex . , . , ? , , , .

Mutex , , . :

pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast) {
   let handle = Arc::new(Mutex::new(broadcaster.broadcast_handle()));
   self.broadcasters.push(handle);
}

:

pub fn start(&self) {
   // Assuming that broadcasters.clone() copies the vector, but increase ref count on els
   for broadcaster in self.broadcasters.clone() {
      let subscribers = self.subscribers.clone();
      thread::spawn(move || {
         loop {
            let message = broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster or channel hung up");
            match subscribers.get(type_to_str(&message.dispatch_type)) {
              Some(ref subs) => { 
                  for sub in subs.iter() { sub.send(message.clone()).unwrap(); }
              },
              None => ()
            }

         }
      });
   }
}
0

Arc Sync, , Arc. Sync, Sender, Receiver.

, , , :

  • Sender, , , , Arc a T , Sender , Send
  • ( Receiver, ) Arc<Mutex<T>>, Sync.
+6

All Articles