Parallel Dose q for Clojure

I haven't used multithreading in Clojure at all, so I don’t know where to start.

I have a doseq whose body can work in parallel. I would like to always have 3 threads (leaving 1 core for free) that evaluate the body in parallel until the range is exhausted. There is no general state, nothing complicated - the Python multiprocessing equivalent would be perfect.

So something like:

 (dopar 3 [i (range 100)] ; repeated 100 times in 3 parallel threads... ...) 

Where to begin? Is there a team for this? Standard package? Good link?

So far I have found pmap and could use this (how do I limit 3 at a time? It looks like it uses 32 at a time - no, the source says 2 + the number of processors), but it looks like this is a basic primitive that should already exist somewhere.

clarification: I really would like to control the number of threads. I have processes that work for a long time and use enough memory, so creating a large amount and hoping that everything will be fine is not a good approach ( an example that uses a significant portion of the available mem ).

update: starting to write a macro that does this and I need a semaphore (or a mutex, or an atom that I can wait for). Are semaphores in Clojure? Or should I use ThreadPoolExecutor? It seems strange that you have to pull so much with Java - I thought that concurrent programming in Clojure should be easy ... Maybe I think about it completely wrong? Hmmm. Agents?

+14
source share
6 answers

pmap actually works great in most cases - it uses a thread pool with a reasonable amount of threads for your machine. I would not try to create my own mechanisms for controlling the number of threads if you do not have real control evidence that problems arise by default.

Having said that if you really want to limit the maximum to three threads, a simple approach is to just use pmap for three subsets of the range:

 (defn split-equally [num coll] "Split a collection into a vector of (as close as possible) equally sized parts" (loop [num num parts [] coll coll c (count coll)] (if (<= num 0) parts (let [t (quot (+ c num -1) num)] (recur (dec num) (conj parts (take t coll)) (drop t coll) (- ct)))))) (defmacro dopar [thread-count [sym coll] & body] `(doall (pmap (fn [vals#] (doseq [~sym vals#] ~@body )) (split-equally ~thread-count ~coll)))) 

Note the use of doall , which is necessary to force an evaluation of pmap (which is lazy).

+3
source

OK, I think I want to have an agent for each loop, with data sent to the agent using send . Agents launched using send are launched from the thread pool, so the number is limited in some way (it does not give fine-grained control to exactly three threads, but now it will need to be done).

[Dave Ray explains in the comments: to control the size of the pool I will need to write my own)

 (defmacro dopar [seq-expr & body] (assert (= 2 (count seq-expr)) "single pair of forms in sequence expression") (let [[kv] seq-expr] `(apply await (for [k# ~v] (let [a# (agent k#)] (send a# (fn [~k] ~@body )) a#))))) 

which can be used as:

 (deftest test-dump (dopar [n (range 7 11)] (time (do-dump-single "/tmp/single" "a" n 10000000)))) 

Hooray! Working! I'm rock! (OK, Clojure rocks a bit). Linked blog post .

+5
source

Why don't you just use pmap? You still cannot control threadpool, but it is much less than writing a custom macro that uses agents (why not futures?).

+4
source

I had a similar problem with the following requirements:

  • Control the number of threads used;
  • Be an agnostic regarding thread pool management;
  • A task order is not required ;
  • The processing time for tasks can be different, therefore, the ordering of tasks should not , but the task that ends earlier must be returned earlier;
  • Rate and enter the input sequence lazily;
  • Elements in the input sequence should not be read out of bounds, but should be buffered and read in accordance with the returned results in order to avoid problems with lack of memory.

The pmap kernel pmap satisfies only the last two assumptions.

Here is an implementation that satisfies these assumptions using the standard Java thread pool ExecutorService along with CompletionService and some input stream splitting:

 (require '[clojure.tools.logging :as log]) (import [java.util.concurrent ExecutorService ExecutorCompletionService CompletionService Future]) (defn take-seq [^CompletionService pool] (lazy-seq (let [^Future result (.take pool)] (cons (.get result) (take-seq pool))))) (defn qmap [^ExecutorService pool chunk-size f coll] (let [worker (ExecutorCompletionService. pool)] (mapcat (fn [chunk] (let [actual-size (atom 0)] (log/debug "Submitting payload for processing") (doseq [item chunk] (.submit worker #(f item)) (swap! actual-size inc)) (log/debug "Outputting completed results for" @actual-size "trades") (take @actual-size (take-seq worker)))) (partition-all chunk-size coll)))) 

As you can see, qmap does not instantiate the thread pool itself, but only ExecutorCompletionService . This allows, for example, to pass a fixed size ThreadPoolExecutorService . In addition, since qmap returns a lazy sequence, it cannot and should not itself manage the thread pool resource. Finally, chunk-size allows you to limit how many elements of the input sequence are implemented and presented as tasks at once.

The code below demonstrates the correct use:

 (import [java.util.concurrent Executors]) (let [thread-pool (Executors/newFixedThreadPool 3)] (try (doseq [result (qmap thread-pool ;; submit no more than 500 tasks at once 500 long-running-resource-intensive-fn unboundedly-large-lazy-input-coll)] (println result)) (finally ;; (.shutdown) only prohibits submitting new tasks, ;; (.shutdownNow) will even cancel already submitted tasks. (.shutdownNow thread-pool)))) 

Here is the documentation for some of the Java concurrency classes used:

+4
source

Not sure if this is idiomatic since I'm still pretty new to Clojure, but the following solution works for me, and it also looks pretty concise:

 (let [number-of-threads 3 await-timeout 1000] (doseq [p-items (partition number-of-threads items)] (let [agents (map agent p-items)] (doseq [a agents] (send-off a process)) (apply await-for await-timeout agents) (map deref agents)))) 
+2
source

In fact, the library now does just that. From their github :

The claypoole library provides parallel versions based on the Clojure pool, such as pmap , future and for .

It provides both ordered and unordered versions for the same.

+2
source

All Articles