Scala Future / Promise

I want to run two or more Future / Promises in parallel and refuse, even if one of the launched Future / Promises does not work and does not want to wait for the rest to finish. What is the most idiomatic way of compiling this pipeline in Scala.

EDIT: more contextual information.

I need to start two external processes that write to the fifo file, and another reading from it. Say if the writer process fails; the reader thread may hang constantly waiting for any input from the file. Therefore, I would like to start both processes in parallel, and quickly execute , even if one of the Future / Promise fails, without waiting for the other to complete.

The following is sample code. The commands are not exactly cat and tail . I used them for brevity.

 val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") } val future2 = Future { executeShellCommand("tail fifo.pipe") } 
+5
source share
3 answers

If I understand the question correctly, what we are looking for is a quick-sequence implementation that is akin to the fail-safe version of firstCompletedOf

Here we look forward to registering a failure callback in the event of a failure of one of the futures, ensuring that we fail as soon as any futures fail.

 import scala.concurrent.{Future, Promise} import scala.util.{Success, Failure} import scala.concurrent.ExecutionContext.Implicits.global def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = { val promise = Promise[Seq[T]] futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}} val res = Future.sequence(futures) promise.completeWith(res).future } 

Unlike Future.sequence , this implementation will fail as soon as any of the futures fails, regardless of order. We show that with an example:

 import scala.util.Try // help method to measure time def resilientTime[T](t: =>T):(Try[T], Long) = { val t0 = System.currentTimeMillis val res = Try(t) (res, System.currentTimeMillis-t0) } import scala.concurrent.duration._ import scala.concurrent.Await 

The first future will fail (failure after 2 seconds)

 val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val res = failFast(Seq(f1,f2,f3)) resilientTime(Await.result(res, 10.seconds)) // res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998) 

It will fail in the future. Failure also after 2 seconds. (pay attention to the order in the construction of the sequence)

 val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val res = failFast(Seq(f3,f2,f1)) resilientTime(Await.result(res, 10.seconds)) // res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998) 

Comparison with Future.sequence , where the failure depends on the order (failure in 10 seconds):

 val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")} val f2 = Future[Int]{Thread.sleep(5000); 42} val f3 = Future[Int]{Thread.sleep(10000); 101} val seq = Seq(f3,f2,f1) resilientTime(Await.result(Future.sequence(seq), 10.seconds)) //res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000) 
+6
source

Use Future.sequence :

 val both = Future.sequence(Seq( firstFuture, secondFuture)); 

This is the correct way to aggregate two or more futures, when one of them fails to correspond to the aggregated future, and the aggregate future ends when all internal futures are completed. An older version of this answer suggested for understanding, which, although very often, did not immediately reject one of the futures, but rather waited for it.

+1
source

Replace Futures

 val f1 = Future { doSomething() } val f2 = Future { doSomething() } val resultF = f1 zip f2 

resultF will fail if any of f1 or f2 fails

The time to be solved is min(f1time, f2time)

 scala> import scala.util._ import scala.util._ scala> import scala.concurrent._ import scala.concurrent._ scala> import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global scala> val f = Future { Thread.sleep(10000); throw new Exception("f") } f: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise$DefaultPromise@da1f03e scala> val g = Future { Thread.sleep(20000); throw new Exception("g") } g: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise$DefaultPromise@634a98e3 scala> val x = f zip g x: scala.concurrent.Future[(Nothing, Nothing)] = scala.concurrent.impl.Promise$DefaultPromise@3447e854 scala> x onComplete { case Success(x) => println(x) case Failure(th) => println(th)} result: java.lang.Exception: f 
0
source

All Articles