Hard disclaimer I've never used this library before, and my low-level knowledge of some concepts is a little ... missing. I mainly read the textbook . I am sure that anyone who has done the asynchronous work will read this and laugh, but this can be a useful starting point for other people. Caution emptor!
Let's start with something simpler, demonstrating how Stream works. We can convert the Result iterator to a stream:
extern crate futures; use futures::Future; use futures::stream::{self, Stream}; fn main() { let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())]; let payloads = stream::iter(payloads.into_iter()); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); }
This shows us one way to consume a stream. We use and_then to do something for each payload (just print it here), and then for_each to convert Stream back to Future . Then we can start the future by calling the oddly named forget method .
Next, link the Redis library in the mix, processing only one message. Since the get_message() method blocks, we need to inject some streams into the mix. It is not a good idea to do a lot of work on this type of asynchronous system, because everything else will be blocked. For example :
Unless otherwise provided, it must be ensured that the implementation of this function is completed very quickly .
In an ideal world, a redis box would be built on top of a library, such as futures, and lay it all out initially.
extern crate redis; extern crate futures; use std::thread; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let msg = pubsub.get_message().expect("Unable to get message"); let payload: Result<String, _> = msg.get_payload(); tx.send(payload).forget(); }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); }
My understanding here is fluffy. In a separate thread, we block the message and insert it into the channel when we receive it. I don’t understand why we need to hold the thread handle. I would expect foo.forget to block itself, waiting for the thread to be empty.
In a telnet connection to the Redis server, send this:
publish rust awesome
And you will see that it works. Adding print statements shows that (for me) the foo.forget runs before the thread appears.
A few posts are more complicated. Sender consumes itself to prevent the generating side being too far ahead of the consuming side. This is achieved by returning another future from send ! We need to throw it back to reuse it for the next iteration of the loop:
extern crate redis; extern crate futures; use std::thread; use std::sync::mpsc; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let mut tx = tx; while let Ok(msg) = pubsub.get_message() { let payload: Result<String, _> = msg.get_payload(); let (next_tx_tx, next_tx_rx) = mpsc::channel(); tx.send(payload).and_then(move |new_tx| { next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); futures::finished(()) }).forget(); tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); } }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); }
I am sure that over time there will be more ecosystems for this type of interaction. For example, the futures-cpupool box can probably be expanded to support similar use in this.