Why pmap | reducers / cards do not use all processor cores?

I am trying to parse a file with a million lines, each line is a json line with some information about the book (author, content, etc.). I am using iota to download a file since my program throws OutOfMemoryError if I try to use slurp . I also use cheshire for parsing strings. The program simply downloads the file and counts all the words in all books.

My first attempt included pmap to do the hard work, I figured it essentially uses all of my processor cores.

 (ns multicore-parsing.core (:require [cheshire.core :as json] [iota :as io] [clojure.string :as string] [clojure.core.reducers :as r])) (defn words-pmap [filename] (letfn [(parse-with-keywords [str] (json/parse-string str true)) (words [book] (string/split (:contents book) #"\s+"))] (->> (io/vec filename) (pmap parse-with-keywords) (pmap words) (r/reduce #(apply conj %1 %2) #{}) (count)))) 

While it seems like it uses all the cores, each core rarely uses more than 50% of its capacity, I assume this is due to the lot size of the pmap, and so I came across a relatively old question where some comments refer to the library clojure.core.reducers .

I decided to rewrite the function using reducers/map :

 (defn words-reducers [filename] (letfn [(parse-with-keywords [str] (json/parse-string str true)) (words [book] (string/split (:contents book) #"\s+"))] (->> (io/vec filename) (r/map parse-with-keywords) (r/map words) (r/reduce #(apply conj %1 %2) #{}) (count)))) 

But CPU usage is worse, and even longer than the previous implementation, it takes more time.

 multicore-parsing.core=> (time (words-pmap "./dummy_data.txt")) "Elapsed time: 20899.088919 msecs" 546 multicore-parsing.core=> (time (words-reducers "./dummy_data.txt")) "Elapsed time: 28790.976455 msecs" 546 

What am I doing wrong? Is mmap + loading the correct approach when analyzing a large file?

EDIT: this is the file I use.

EDIT2: The following are the timings with iota/seq instead of iota/vec :

 multicore-parsing.core=> (time (words-reducers "./dummy_data.txt")) "Elapsed time: 160981.224565 msecs" 546 multicore-parsing.core=> (time (words-pmap "./dummy_data.txt")) "Elapsed time: 160296.482722 msecs" 546 
+5
source share
1 answer

I donโ€™t think gearboxes will be the right solution for you, since they canโ€™t cope with lazy sequences at all (the gearbox will give the correct results with a lazy sequence, but it wonโ€™t understand well).

You can take a look at this sample code from the book Seven Concurrency Models in seven weeks (disclaimer: I am the author), which solves a similar problem (counting the number of times each word appears on Wikipedia).

Given the list of pages on Wikipedia, this function sequentially counts words ( get-words returns a sequence of words from a page):

 (defn count-words-sequential [pages] (frequencies (mapcat get-words pages))) 

This is a parallel version using pmap , which is faster, but only about 1.5 times faster:

 (defn count-words-parallel [pages] (reduce (partial merge-with +) (pmap #(frequencies (get-words %)) pages))) 

The reason it spreads only 1.5 times faster is because reduce becomes a bottleneck - it calls (partial merge-with +) once for each page. Combining 100-page batches increases productivity to 3.2x on a 4-core computer:

 (defn count-words [pages] (reduce (partial merge-with +) (pmap count-words-sequential (partition-all 100 pages)))) 
+2
source

All Articles