How to clear clojure core.async feeds?

I look at Clojure core.async for the first time and go through this great Rich Hickey presentation: http://www.infoq.com/presentations/clojure-core-async

I had a question about an example that he shows at the end of his presentation:

core.async Web Example

According to Rich, this example is mainly trying to get the result on the Internet, videos and images for a specific request. He tries two different sources in parallel for each of these results and simply pulls out the fastest result for each. And the whole operation can take no more than 80 ms, so if we cannot get, for example, an image result of 80 ms, we will just give up. The “fastest” function creates and returns a new channel and launches two racing modes to get the result and place it on the channel. Then we just take the first result from the “fastest” channel and hit it on channel c.

My question is: what happens to these three temporary, unnamed “fast” channels after we take their first result? Presumably, there is still a go process that is parked, trying to put the second result on the channel, but no one is listening, so it never ends. And since the channel is never connected to anything, it doesn't seem like we have any way to do anything with it again. Will the transition process and the channel “realize” that no one else cares about their results and is not cleaned up? Or are we essentially just “missing” the three channels / go processes in this code?

+7
clojure core.async
source share
3 answers

There is no leak.

Parked go tied to the channels on which they tried to perform the operation, and do not have an independent existence outside of it. If another code loses interest in the channels on which some go installed (NB. A go can simultaneously become a putter / taker on many channels, if it parks on alt! / alts! ), Ll will be GC'd along with these channels.

The only caveat is that in order to be GC'd, go must first be parked. Thus, any go that continues to do things in a loop without parking ( <! / >! / alt! / alts! ) Will actually live forever. It's hard to write such code by accident.

+3
source share

Cautions and exceptions aside, you can check out the garbage collection on the JVM in the REPL.

eg:

 (require '[clojure.core.async :as async]) => nil (def c (async/chan)) => #'user/c (def d (async/go-loop [] (when-let [v (async/<! c)] (println v) (recur)))) => #'user/d (async/>!! c :hi) => true :hi ; core.async go block is working (import java.lang.ref.WeakReference) => java.lang.ref.WeakReference ; hold a reference without preventing garbage collection (def e (WeakReference. c)) => #'user/e (def f (WeakReference. d)) => #'user/f (.get e) => #object[...] (.get f) => #object[...] (def c nil) => #'user/c (def d nil) => #'user/d (println "We need to clear *1, *2 and *3 in the REPL.") We need to clear *1, *2 and *3 in the REPL. => nil (println *1 *2 *3) nil #'user/d #'user/c => nil (System/gc) => nil (.get e) => nil (.get f) => nil 

What just happened? I tweak the go block and verify that it works. WeakReference was then used to monitor the communication channel (c) and the return channel of the return block (d). Then I deleted all the links to c and d (including *1 , *2 and *3 created by my REPL), requested garbage collection (and lucky System.gc Javadoc does not give serious guarantees), and then noticed that my weak links have been cleaned.

In this case, at least after the links to the involved channels were deleted, the channels were garbage collected (regardless of my refusal to close them!)

+3
source share

Presumably the channel created by fastest returns the result of the fastest request method and then closes.

If a second result was obtained, your assumption could lead to a leak of fastest processes. Their results are never consumed. If they relied on all their results that needed to be destroyed, they would not stop.

Note that this can also happen if channel t selected in the alt! clause alt! .

The usual way to fix this would be to close channel c in the last go block with close! . Then they are postponed to the closed channel, and manufacturers can stop working.

The problem can also be resolved by implementing fastest . The process created in fastest itself can do put through alts! and timeout and stop working if the produced values ​​are not consumed within a certain period of time.

I think Rich did not consider the problem on the slide in favor of a shorter example.

+1
source share

All Articles