If I understand the question correctly, what we are looking for is a quick-sequence implementation that is akin to the fail-safe version of firstCompletedOf
Here we look forward to registering a failure callback in the event of a failure of one of the futures, ensuring that we fail as soon as any futures fail.
import scala.concurrent.{Future, Promise} import scala.util.{Success, Failure} import scala.concurrent.ExecutionContext.Implicits.global def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = { val promise = Promise[Seq[T]] futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}} val res = Future.sequence(futures) promise.completeWith(res).future }
Unlike Future.sequence , this implementation will fail as soon as any of the futures fails, regardless of order. We show that with an example:
import scala.util.Try // help method to measure time def resilientTime[T](t: =>T):(Try[T], Long) = { val t0 = System.currentTimeMillis val res = Try(t) (res, System.currentTimeMillis-t0) } import scala.concurrent.duration._ import scala.concurrent.Await
The first future will fail (failure after 2 seconds)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val res = failFast(Seq(f1,f2,f3)) resilientTime(Await.result(res, 10.seconds)) // res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
It will fail in the future. Failure also after 2 seconds. (pay attention to the order in the construction of the sequence)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val res = failFast(Seq(f3,f2,f1)) resilientTime(Await.result(res, 10.seconds)) // res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
Comparison with Future.sequence , where the failure depends on the order (failure in 10 seconds):
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val seq = Seq(f3,f2,f1) resilientTime(Await.result(Future.sequence(seq), 10.seconds)) //res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)