I had a similar problem with the following requirements:
- Control the number of threads used;
- Be an agnostic regarding thread pool management;
- A task order is not required ;
- The processing time for tasks can be different, therefore, the ordering of tasks should not , but the task that ends earlier must be returned earlier;
- Rate and enter the input sequence lazily;
- Elements in the input sequence should not be read out of bounds, but should be buffered and read in accordance with the returned results in order to avoid problems with lack of memory.
The pmap kernel pmap satisfies only the last two assumptions.
Here is an implementation that satisfies these assumptions using the standard Java thread pool ExecutorService along with CompletionService and some input stream splitting:
(require '[clojure.tools.logging :as log]) (import [java.util.concurrent ExecutorService ExecutorCompletionService CompletionService Future]) (defn take-seq [^CompletionService pool] (lazy-seq (let [^Future result (.take pool)] (cons (.get result) (take-seq pool))))) (defn qmap [^ExecutorService pool chunk-size f coll] (let [worker (ExecutorCompletionService. pool)] (mapcat (fn [chunk] (let [actual-size (atom 0)] (log/debug "Submitting payload for processing") (doseq [item chunk] (.submit worker #(f item)) (swap! actual-size inc)) (log/debug "Outputting completed results for" @actual-size "trades") (take @actual-size (take-seq worker)))) (partition-all chunk-size coll))))
As you can see, qmap does not instantiate the thread pool itself, but only ExecutorCompletionService . This allows, for example, to pass a fixed size ThreadPoolExecutorService . In addition, since qmap returns a lazy sequence, it cannot and should not itself manage the thread pool resource. Finally, chunk-size allows you to limit how many elements of the input sequence are implemented and presented as tasks at once.
The code below demonstrates the correct use:
(import [java.util.concurrent Executors]) (let [thread-pool (Executors/newFixedThreadPool 3)] (try (doseq [result (qmap thread-pool ;; submit no more than 500 tasks at once 500 long-running-resource-intensive-fn unboundedly-large-lazy-input-coll)] (println result)) (finally ;; (.shutdown) only prohibits submitting new tasks, ;; (.shutdownNow) will even cancel already submitted tasks. (.shutdownNow thread-pool))))
Here is the documentation for some of the Java concurrency classes used:
Daniel Dinnyes
source share