TraversableOnce, Future, and Option in Scala for Understanding

I have a list of row identifiers representing DB records. I would like to download them from the database asynchronously, upload each record asynchronously to the remote server, and then, when everything is uploaded, record the identifiers of the downloaded records.

Since I am in Scala 2.9.2, I use the core-util Future kernel implementation, but it should work just like 2.10 futures in terms of monadic transformations.

The general concept is as follows:

def fetch(id: String): Future[Option[Record]] def upload(record: Record): Future[String] def notifyUploaded(ids: Seq[String]): Unit val ids: Seq[String] = .... 

I try to do this with understanding, but the fact that fetch returns Future of Option makes it unclear and the code does not compile:

 for { id <- ids maybeRecord <- fetch(id) record <- maybeRecord uploadedId <- upload(record) } yield uploadedId 

Compiling the results results in the following error:

 scala: type mismatch; found : com.twitter.util.Future[String] required: Option[?] uploadedId <- upload(record) ^ 

What am I missing? why does the compiler expect uploadedId to become an option? Is there any nice way I could get around this?

+8
scala future monads
source share
2 answers

Consider the signature of the flatMap (or bind) function:

 trait Monad[M[_]] { def flatMap[A](a : M[A], f : A => M[B]) : M[B] .... 

In your case, you are trying to use flatMap on Option , pointing it to f , which Future generates. But, as in the signature above, f must generate something in the same monad to which it was called.

Scala is not necessarily terribly useful in this regard, as it is pretty good at transforming things (like Seq s) in such a way that it seems like you can bind arbitrary flatMap calls regardless of the container.

What you might want is a β€œMonad Transformer,” which gives you the ability to compose monads. Debasish Ghosh posted a post about using Scalaz monad transformers here .

+6
source share

You cannot mix all the different types in one for understanding, I realized that you can mix Seq and Option, and the result will be either Seq or Option, depending on what comes first. It is not possible to combine Future and Seq or Option. If you want to use for understanding, you have to cascade them. In such cases, it might be better with map / flatMap. I implemented your question in both directions and added types to a few intermediate results so you can see the clutter that is created while working with all of these different types.

 object TestClass { import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ case class Record(id: String) def fetch(id: String): Future[Option[Record]] = Future { Thread.sleep(1000); Some(Record(id)) } def upload(record: Record): Future[String] = Future { Thread.sleep(3000); record.id + "_uploaded" } def notifyUploaded(ids: Seq[String]): Unit = println("notified" + ids) val ids: Seq[String] = Seq("a", "b", "c") def main(args: Array[String]): Unit = { forComprehensionImpl() mapAndFlatMapImpl() } def forComprehensionImpl() = { val result: Seq[Future[Option[Future[String]]]] = for { id <- ids } yield { for { maybeRecord <- fetch(id) } yield { for { record <- maybeRecord } yield { for { uploadedId <- upload(record) } yield uploadedId } } } val result2: Future[Seq[Option[Future[String]]]] = Future.sequence(result) val result3: Future[Unit] = result2.flatMap { x: Seq[Option[Future[String]]] => Future.sequence(x.flatten).map(notifyUploaded) } Await.result(result3, Duration.Inf) } def mapAndFlatMapImpl() = { val res: Seq[Future[Iterable[String]]] = ids.map { id => fetch(id).flatMap { maybeRecord => val res1: Option[Future[Seq[String]]] = maybeRecord.map { record => upload(record) map (Seq(_)) } res1 match { case Some(a) => a case None => Future(Seq()) } } } val res3: Future[Unit] = Future.sequence(res) map (a => notifyUploaded(a.flatten)) Await.result(res3, Duration.Inf) } } 
0
source share

All Articles