Clojure pmap - why don't I use all cores?

I am trying to use the clojure library pantomimeto extract / ocr text from a large number of documents tif(among others).

My plan was to use pmap to apply matching over the sequence of input data (from the postgres database), and then update the same postgres database using the OCR output of tika / tesseract. This works fine, however, I notice that many of the cores are inactive from time to time.

Is there a way to reconcile this, and what steps can I take to determine why this could be blocking somewhere? All processing takes place in one tif file, and each thread is completely mutually exclusive.

Additional Information:

  • some tika / tesseract processes take 3 seconds, others take up to 90 seconds. Generally speaking, teak is heavily connected to the processor. I have enough memory in accordance with htop.
  • postgres has no lockout issues in session management, so I don’t think it holds me.
  • perhaps futurewaiting somewhere deref? how to say where?

Any advice appreciated, thanks. Code added below.

(defn parse-a-path [{:keys [row_id, file_path]}]
      (try
        (let [
              start        (System/currentTimeMillis)
              mime_type    (pm/mime-type-of file_path)
              file_content (-> file_path (extract/parse) :text)
              language     (pl/detect-language file_content)
              ]
          {:mime_type   mime_type
          :file_content file_content
          :language     language
          :row_id       row_id
          :parse_time_in_seconds   (float (/ ( - (System/currentTimeMillis) start) 100))
          :record_status "doc parsed"})))


(defn fetch-all-batch []
      (t/info (str "Fetching lazy seq. all rows for batch.") )
      (jdbc/query (db-connection)
                  ["select
                   row_id,
                   file_path ,
                   file_extension
                   from the_table" ]))


(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
      (let [parse-out (parse-a-path all-keys )]
        (try
          (doall
            (jdbc/execute!
              (db-connection)
              ["update the_table
               set
               record_last_updated        = current_timestamp ,
               file_content          = ?                 ,
               mime_type             = ?                 ,
               language              = ?                 ,
               parse_time_in_seconds = ?                 ,
               record_status         = ?
               where row_id = ? "
               (:file_content          parse-out) ,
               (:mime_type             parse-out) ,
               (:language              parse-out) ,
               (:parse_time_in_seconds parse-out) ,
               (:record_status         parse-out) ,
               row_id ])
            (t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
                          " in " (:parse_time_in_seconds parse-out) " seconds." )))
          (catch  Exception _ ))))

(dorun
  (pmap
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
  )
+4
source share
2 answers

pmapstarts the display function in parallel in batches (+ 2 cores), but keeps order. This means that if you have 8 cores, a batch of 10 elements will be processed, but a new batch will be launched only if all 10 are over.

, future, delay deref, . claypoole, , future.

pmap pfor (upmap upfor), , pmap, ; , - .

, IO , map for.

, , - .

  (require '[com.climate.claypoole :as cp])

  (cp/upmap (cp/ncpus)
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
+3

. , , :

  • pmap f . , . , 3 , - 90 . , 3 , , . , iddle, .

  • , , , - , . , , clojure.core.reducers ('map', 'filter' 'fold') , .

34 8 . , .

+1

All Articles