With Clojure threads for long processes and comparing their returns

I have two different functions on two very large datasets that need to be processed, eventually bringing to two values ​​of type Boolean. these values ​​should then be combined for the final result. my question is the best way to create threads so that two long functions can work simultaneously. my thoughts were something like:

(def f (future longProcessOne(data_one))) (def g (future longProcessTwo(data_two))) (and @f @g) 

but I was looking for information on how best to do this.

+6
multithreading clojure
source share
2 answers

(a promise-based approach from above, a core.async-based approach below. Both short circuits on the first false value.)


Here's a version using the fact that one promise can be delivered several times (although only the first delivery will succeed in setting its value, subsequent deliveries simply return nil without side effects).

 (defn thread-and "Computes logical conjunction of return values of fs, each of which is called in a future. Short-circuits (cancelling the remaining futures) on first falsey value." [& fs] (let [done (promise) ret (atom true) fps (promise)] (deliver fps (doall (for [f fs] (let [p (promise)] [(future (if-not (swap! ret #(and %1 %2) (f)) (deliver done true)) (locking fps (deliver p true) (when (every? realized? (map peek @fps)) (deliver done true)))) p])))) @done (doseq [[fut] @fps] (future-cancel fut)) @ret)) 

Some tests:

 (thread-and (constantly true) (constantly true)) ;;= true (thread-and (constantly true) (constantly false)) ;;= false (every? false? (repeatedly 100000 #(thread-and (constantly true) (constantly false)))) ;;= true ;; prints :foo, but not :bar (thread-and #(do (Thread/sleep 1000) (println :foo)) #(do (Thread/sleep 3000) (println :bar))) 

Combining the ideas of Arthur and A. Webb, you can use core.async to and the results together in a short circuit when returning the first return value:

 (defn thread-and "Call each of the fs on a separate thread. Return logical conjunction of the results. Short-circuit (and cancel the calls to remaining fs) on first falsey value returned." [& fs] (let [futs-and-cs (doall (for [f fs] (let [c (chan)] [(future (>!! c (f))) c])))] (loop [futs-and-cs futs-and-cs] (if (seq futs-and-cs) (let [[result c] (alts!! (map peek futs-and-cs))] (if result (recur (remove #(identical? (peek %) c) futs-and-cs)) (do (doseq [fut (map first futs-and-cs)] (future-cancel fut)) false))) true)))) 

Testing with (constantly false) and (constantly true) :

 (thread-and (constantly true) (constantly true)) ;= true (thread-and (constantly true) (constantly false)) ;= false ;;; etc. 

Also note that a short circuit really works:

 ;;; prints :foo before returning false (thread-and #(do (Thread/sleep 3000) false) #(do (Thread/sleep 1000) (println :foo))) ;;; does not print :foo (thread-and #(do (Thread/sleep 3000) false) #(do (Thread/sleep 7000) (println :foo))) 
+3
source share

Your approach is pretty normal Clojure code. Another choice is to use promises, or if you need more complex processing, you can consider using something like lamina, or if you want to live on the edge of bleeding you can try core.async :

 (ns async-example.core (:require [clojure.core.async :refer :all]) (defn example [] (let [a (chan) ; a channel for a to report it answer b (chan) ; a channel for b to report it answer output (chan)] ; a channel for the reporter to report back to the repl (go (<! (timeout (rand-int 1000))) ; process a (>! a (rand-nth [true false]))) (go (<! (timeout (rand-int 1000))) ; process b (>! b (rand-nth [true false]))) (go (>! output (and (<! a) (<! b)))) ; the reporter process output)) ;return the channe that the result will be sent to async-example.core> (<!! (go (<! (example)))) false async-example.core> (<!! (go (<! (example)))) false async-example.core> (<!! (go (<! (example)))) true 

Of course, this is too complicated for your situation, although in any case it is incredibly interesting; -)

+5
source share

All Articles