Waiting for n channels with core.async

Similarly, alt! waiting for one of n channels to get the value, I'm looking for an idiomatic way to wait for all n channels to get the value.

I need this because I β€œrun” n go blocks to work with asynchronous tasks, and I want to know when everything will be done. I am sure there is a very beautiful way to achieve this.

+6
source share
2 answers

You can say (mapv #(async/<!! %) channels) .

If you want to process individual values ​​as they become available, and then do something special after the last channel returns a value, you can use the exploit that alts! / alts!! accepts a channel vector, and they are functions, not macros, so you can easily transfer dynamically constructed vectors.

So you can use alts!! to wait for the initial collection of n channels, and then use it again on the remaining channels, etc.

 (def c1 (async/chan)) (def c2 (async/chan)) (def out (async/thread (loop [cs [c1 c2] vs []] (let [[vp] (async/alts!! cs) cs (filterv #(not= p %) cs) vs (conj vs v)] (if (seq cs) (recur cs vs) vs))))) (async/>!! c1 :foo) (async/>!! c2 :bar) (async/<!! out) ;= [:foo :bar] 

If instead you would like to take all the values ​​from all the input channels and then do something else when they are all closed, you want to use async/merge :

clojure.core.async / merges
([chs] [chs buf-or-n])
It takes a set of source channels and returns a channel that contains all the values ​​taken from them. The returned channel will be unbuffered by default, or buf-or-n may be supplied. The channel will be closed after closing all channels of the source.

+4
source

Use the core.async map function:

 (<!! (a/map vector [ch1 ch2 ch3])) ;; [val-from-ch-1 val-from-ch2 val-from-ch3] 
+8
source

All Articles