How to implement a futures flow for a blocking call using .rs and Redis PubSub futures?

I am trying to create a system with which my application can receive streaming data from a Redis PubSub channel and process it. The Redis driver that I use, along with all the other Redis drivers for Rust that I have seen, uses a lock operation to get data from a pipe, which returns only when it receives data:

let msg = match pubsub.get_message() { Ok(m) => m, Err(_) => panic!("Could not get message from pubsub!") }; let payload: String = match msg.get_payload() { Ok(s) => s, Err(_) => panic!("Could not convert redis message to string!") }; 

I wanted to use the futures-rs library to wrap this lock function call in the future so that I could perform other tasks in my application while waiting for input.

I read the tutorial for futures and tried to create a Stream that will signal when the data is received by PubSub, but I cannot figure out how to do this.

How to create schedule and poll functions for pubsub.get_message() blocking function?

+7
asynchronous future rust redis
source share
1 answer

Hard disclaimer I've never used this library before, and my low-level knowledge of some concepts is a little ... missing. I mainly read the textbook . I am sure that anyone who has done the asynchronous work will read this and laugh, but this can be a useful starting point for other people. Caution emptor!


Let's start with something simpler, demonstrating how Stream works. We can convert the Result iterator to a stream:

 extern crate futures; use futures::Future; use futures::stream::{self, Stream}; fn main() { let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())]; let payloads = stream::iter(payloads.into_iter()); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); } 

This shows us one way to consume a stream. We use and_then to do something for each payload (just print it here), and then for_each to convert Stream back to Future . Then we can start the future by calling the oddly named forget method .


Next, link the Redis library in the mix, processing only one message. Since the get_message() method blocks, we need to inject some streams into the mix. It is not a good idea to do a lot of work on this type of asynchronous system, because everything else will be blocked. For example :

Unless otherwise provided, it must be ensured that the implementation of this function is completed very quickly .

In an ideal world, a redis box would be built on top of a library, such as futures, and lay it all out initially.

 extern crate redis; extern crate futures; use std::thread; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let msg = pubsub.get_message().expect("Unable to get message"); let payload: Result<String, _> = msg.get_payload(); tx.send(payload).forget(); }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); } 

My understanding here is fluffy. In a separate thread, we block the message and insert it into the channel when we receive it. I don’t understand why we need to hold the thread handle. I would expect foo.forget to block itself, waiting for the thread to be empty.

In a telnet connection to the Redis server, send this:

 publish rust awesome 

And you will see that it works. Adding print statements shows that (for me) the foo.forget runs before the thread appears.


A few posts are more complicated. Sender consumes itself to prevent the generating side being too far ahead of the consuming side. This is achieved by returning another future from send ! We need to throw it back to reuse it for the next iteration of the loop:

 extern crate redis; extern crate futures; use std::thread; use std::sync::mpsc; use futures::Future; use futures::stream::{self, Stream}; fn main() { let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); let (tx, payloads) = stream::channel(); let redis_thread = thread::spawn(move || { let mut tx = tx; while let Ok(msg) = pubsub.get_message() { let payload: Result<String, _> = msg.get_payload(); let (next_tx_tx, next_tx_rx) = mpsc::channel(); tx.send(payload).and_then(move |new_tx| { next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); futures::finished(()) }).forget(); tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); } }); let foo = payloads .and_then(|payload| futures::finished(println!("{}", payload))) .for_each(|_| Ok(())); foo.forget(); redis_thread.join().expect("unable to join to thread"); } 

I am sure that over time there will be more ecosystems for this type of interaction. For example, the futures-cpupool box can probably be expanded to support similar use in this.

+11
source share

All Articles