In akka-stream, how to create an unordered source from a collection of futures

I need to create akka.stream.scaladsl.Source[T, Unit] from the Future[T] collection.

For example, having a set of futures that return integers,

 val f1: Future[Int] = ??? val f2: Future[Int] = ??? val fN: Future[Int] = ??? val futures = List(f1, f2, fN) 

how to create

 val source: Source[Int, Unit] = ??? 

.

I cannot use the Future.sequence combinator, since then I will wait for the completion of every future before getting anything from the source. I want to receive results in any order as soon as any future is completed.

I understand that Source is a purely functional API, and it should not run anything before implementing it in any way. So my idea is to use an Iterator (which is lazy) to create the source:

 Source { () => new Iterator[Future[Int]] { override def hasNext: Boolean = ??? override def next(): Future[Int] = ??? } } 

But this will be the source of futures, not actual values. I could also block next with Await.result(future) , but I'm not sure if the tread pool thread will be blocked. It will also trigger futures sequentially while I need to execute parallel execution.

UPDATE 2 : it turned out that there is a much simpler way to do this (thanks to Victor Clan):

 Source(futures).mapAsync(1)(identity) 

UPDATE : this is what I got from @sschaef answer:

 def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = { def run(actor: ActorRef): Unit = { futures.foreach { future => future.onComplete { case Success(value) => actor ! value case Failure(NonFatal(t)) => actor ! Status.Failure(t) // to signal error } } Future.sequence(futures).onSuccess { case _ => actor ! Status.Success(()) // to signal stream end } } Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run) } // ScalaTest tests follow import scala.concurrent.ExecutionContext.Implicits.global implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() "futuresToSource" should "convert futures collection to akka-stream source" in { val f1 = Future(1) val f2 = Future(2) val f3 = Future(3) whenReady { futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _) } { results => results should contain theSameElementsAs Seq(1, 2, 3) } } it should "fail on future failure" in { val f1 = Future(1) val f2 = Future(2) val f3 = Future.failed(new RuntimeException("future failed")) whenReady { futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed } { t => t shouldBe a [RuntimeException] t should have message "future failed" } } 
+7
scala future akka-stream reactive-streams
source share
2 answers

Create a futures source and then β€œsmooth” it through mapAsync:

 scala> Source(List(f1,f2,fN)).mapAsync(1)(identity) res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804 
+6
source share

One of the easiest ways to submit a Source is through an Actor:

 import scala.concurrent.Future import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ implicit val system = ActorSystem("MySystem") def run(actor: ActorRef): Unit = { import system.dispatcher Future { Thread.sleep(100); actor ! 1 } Future { Thread.sleep(200); actor ! 2 } Future { Thread.sleep(300); actor ! 3 } } val source = Source .actorRef[Int](0, OverflowStrategy.fail) .mapMaterializedValue(ref β‡’ run(ref)) implicit val m = ActorMaterializer() source runForeach { int β‡’ println(s"received: $int") } 

An actor is created using the Source.actorRef method and is accessible through the mapMaterializedValue method. run simply takes the Actor and sends it all the completed values ​​that can be accessed through source . In the above example, the values ​​are passed directly to the Future, but this, of course, can be done everywhere (for example, in the call to onComplete in the future).

+5
source share

All Articles