About Future.firstCompletedOf and the Garbage Collection Engine

I ran into this problem in my real project and confirmed my test code and profiler. Instead of embedding the code "tl; dr", I show you a photo and then describe it. enter image description here

Simply put, I use Future.firstCompletedOf to get the result from 2 Future s, both of which do not have common things and do not care about each other. Despite the fact that I want to ask a question, the garbage collector cannot recycle the first Result object until both Future run out .

So I'm really interested in what mechanism is behind this. Can anyone explain this from a lower level or give me some hint.

Thanks!

PS: is it because they have the same ExecutionContext ?

** Update ** insert test code as requested

 object Main extends App{ println("Test start") val timeout = 30000 trait Result { val id: Int val str = "I'm short" } class BigObject(val id: Int) extends Result{ override val str = "really big str" } def guardian = Future({ Thread.sleep(timeout) new Result { val id = 99999 } }) def worker(i: Int) = Future({ Thread.sleep(100) new BigObject(i) }) for (i <- Range(1, 1000)){ println("round " + i) Thread.sleep(20) Future.firstCompletedOf(Seq( guardian, worker(i) )).map( r => println("result" + r.id)) } while (true){ Thread.sleep(2000) } } 
+7
garbage-collection scala concurrency jvm future
source share
1 answer

Let's see how firstCompletedOf is implemented:

 def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() val completeFirst: Try[T] => Unit = p tryComplete _ futures foreach { _ onComplete completeFirst } p.future } 

When executing { futures foreach { _ onComplete completeFirst } function { _ onComplete completeFirst } saved somewhere through ExecutionContext.execute . Where exactly this function is stored does not matter, we just know that it needs to be saved somewhere so that it can be selected later and executed in the thread pool when the thread becomes available.

This function closes above completeFirst , which closes above p . Thus, while there is another future (from futures ) that needs to be completed, there is a link to p that prevents it from garbage collection (although in this case the chances are that firstCompletedOf already returned by removing p from the stack).

When the first future ends, it stores the result in a promise (by invoking p.tryComplete ). Since the promise p contains the result, the result is achieved at least until p reaches the level, and, as we have seen, p be achieved if at least one of the futures not been completed, for this reason the result cannot be collected before all futures are completed.

UPDATE : Now the question is: can this be fixed? I think it's possible. All we need to do is ensure that the first future completes the β€œcrossing out" of the p reference in a thread-safe way, which can be done using an example using AtomicReference. Something like that:

 def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() val pref = new java.util.concurrent.atomic.AtomicReference(p) val completeFirst: Try[T] => Unit = { result: Try[T] => val promise = pref.getAndSet(null) if (promise != null) { promise.tryComplete(result) } } futures foreach { _ onComplete completeFirst } p.future } 

I tested it, and, as expected, it allows you to get a result that collects garbage as soon as the first future is completed. He must behave the same in all other respects.

+9
source share

All Articles