Strange behavior clojure ref

I have 100 workers (agents) who share one ref , which contains a collection of tasks. Although this collection has tasks, each worker receives one task from this collection (in the dosync block), prints it, and sometimes returns it to the collection (in the dosync block):

 (defn have-tasks? [tasks] (not (empty? @tasks))) (defn get-task [tasks] (dosync (let [task (first @tasks)] (alter tasks rest) task))) (defn put-task [tasks task] (dosync (alter tasks conj task)) nil) (defn worker [& {:keys [tasks]}] (agent {:tasks tasks})) (defn worker-loop [{:keys [tasks] :as state}] (while (have-tasks? tasks) (let [task (get-task tasks)] (println "Task: " task) (when (< (rand) 0.1) (put-task tasks task)))) state) (defn create-workers [count & options] (->> (range 0 count) (map (fn [_] (apply worker options))) (into []))) (defn start-workers [workers] (doseq [worker workers] (send-off worker worker-loop))) (def tasks (ref (range 1 10000000))) (def workers (create-workers 100 :tasks tasks)) (start-workers workers) (apply await workers) 

When I run this code, the last value printed by the agents (after several attempts): 435445 4556294 1322061 , 3950017 . But never 9999999 what I expect. And every time the collection is really empty at the end. What am I doing wrong?

Edit:

I rewrote the duty cycle as simple as possible:

 (defn worker-loop [{:keys [tasks] :as state}] (loop [] (when-let [task (get-task tasks)] (println "Task: " task) (recur))) state) 

But the problem still exists. This code behaves as expected when creating one and only one worker.

+7
concurrency clojure ref stm
source share
3 answers

The problem here has nothing to do with agents and hardly anything connected with laziness. Here's a slightly reduced version of the source code that still detects the problem:

 (defn f [init] (let [state (ref init) task (fn [] (loop [last-n nil] (if-let [n (dosync (let [n (first @state)] (alter state rest) n))] (recur n) (locking :out (println "Last seen:" last-n))))) workers (->> (range 0 5) (mapv (fn [_] (Thread. task))))] (doseq [w workers] (.start w)) (doseq [w workers] (.join w)))) (defn r [] (f (range 1 100000))) (defn i [] (f (->> (iterate inc 1) (take 100000)))) (defn t [] (f (->> (range 1 100000) (take Integer/MAX_VALUE)))) 

Running this code shows that both i and t , both lazy, work reliably, while r does not reliably. The problem is actually a concurrency error in the class returned by the range call. Indeed, this error is registered in this Clojure ticket and fixed as Clojure version 1.9.0-alpha11 .

A summary of the error if the ticket is unavailable for some reason: in the inner part of the rest call, the range result had a small opportunity for the race state: " flag ", which says that the next value has already been calculated, was set to the actual value , which meant that the second thread could see this flag as true, even if the "next value" is still nil . After that, calling alter correct this nil value in ref. This has been fixed by replacing two destination strings .

In cases where the range result was either forcibly implemented in one thread or wrapped in another lazy seq, this error will not appear.

+4
source share

I asked this one in the Clojure Google group and it helped me find the answer.

The problem is that I used the lazy sequence in the STM transaction.

When I replaced this code:

 (def tasks (ref (range 1 10000000))) 

:

 (def tasks (ref (into [] (range 1 10000000)))) 

It worked as expected!

In my production code, where the problem occurred, I used the Korma structure, which also returns a lazy collection of tuples, as in my example.

Conclusion: Avoid using lazy data structures in an STM transaction.

+3
source share

When the last number in the range is reached, there still the old numbers are held by the workers. Some of them will be returned to the queue to be processed again.

To better see what happens, you can modify the worker-loop to print the last task processed by each worker:

 (defn worker-loop [{:keys [tasks] :as state}] (loop [last-task nil] (if (have-tasks? tasks) (let [task (get-task tasks)] ;; (when (< (rand) 0.1) ;; (put-task tasks task) (recur task)) (when last-task (println "Last task:" last-task)))) state) 

It also shows the race condition in code, where are the tasks that have-tasks? see have-tasks? are often accepted by others when get-task is called at the end of task processing.

Can race status be resolved by removing have-tasks? and instead using the nil return value from get-task as a signal that there are no more tasks available (for now).

Updated:

As already noted, the conditions of this race do not explain the problem.

Also, the problem is not solved by removing the possible race condition in get-task as follows:

 (defn get-task [tasks] (dosync (first (alter tasks rest)))) 

However, changing the get-task to use explicit blocking seems to solve the problem:

  (defn get-task [tasks] (locking :lock (dosync (let [task (first @tasks)] (alter tasks rest) task)))) 
+1
source share

All Articles