How to add a time-oriented observer to Scala Future?

I want to add the after(d: FiniteDuration)(callback: => Unit) utility for Scala Future , which will allow me to do this:

 val f = Future(someTask) f.after(30.seconds) { println("f has not completed in 30 seconds!") } f.after(60.seconds) { println("f has not completed in 60 seconds!") } 

How can i do this?

+5
source share
4 answers

I usually use a thread pool executor and promises:

 import scala.concurrent.duration._ import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor} import scala.concurrent.{Future, Promise} val f: Future[Int] = ??? val executor = new ScheduledThreadPoolExecutor(2, Executors.defaultThreadFactory(), AbortPolicy) def withDelay[T](operation: ⇒ T)(by: FiniteDuration): Future[T] = { val promise = Promise[T]() executor.schedule(new Runnable { override def run() = { promise.complete(Try(operation)) } }, by.length, by.unit) promise.future } Future.firstCompletedOf(Seq(f, withDelay(println("still going"))(30 seconds))) Future.firstCompletedOf(Seq(f, withDelay(println("still still going"))(60 seconds))) 
+1
source

One way is to use Future.firstCompletedOf (see blogpost ):

 val timeoutFuture = Future { Thread.sleep(500); throw new TimeoutException } val f = Future.firstCompletedOf(List(f, timeoutFuture)) f.map { case e: TimeoutException => println("f has not completed in 0.5 seconds!") } 

where TimeoutException is some exception or type.

0
source

Use import akka.pattern.after . If you want to implement it without akka, this is the source code . Another example (java) is TimeoutFuture at com.google.common.util.concurrent .

0
source

Something like this is possible:

  object PimpMyFuture { implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { Future { blocking { Await.ready(f, delay) } } recover { case _: TimeoutException => callback } f } } } import PimpMyFuture._ Future { Thread.sleep(10000); println ("Done") } .after(5.seconds) { println("Still going") } 

This implementation is simple, but it basically doubles the number of threads you need - each active future effectively takes up two threads, which is a bit wasteful. In addition, you can use scheduled tasks so that your expectations are not blocked. I do not know the "standard" scheduler in scala (each lib has its own), but for a simple task like this, you can directly use java TimerTask :

 object PimpMyFutureNonBlocking { val timer = new java.util.Timer implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { val task = new java.util.TimerTask { def run() { if(!f.isCompleted) callback } } timer.schedule(task, delay.toMillis) f.onComplete { _ => task.cancel } f } } } 
0
source

All Articles