Asynchronous job queue for web service in Clojure

I'm currently trying to create a web service with a RESTful API that handles some lengthy tasks (tasks).

The idea is that the user submits the task by performing a POST that returns some URL to check the status of the job, which also contains the URL for the results. After the task is completed (i.e., some value was written to the database), the results URL will return the corresponding information (instead of any results), and the task URL will indicate the completed status.

Unfortunately, the calculations are quite intense, so you can run it only once, so jobs must be queued.

In pseudo something like this is needed

(def job-queue (atom queue)) ;; some queue (def jobs (atom {})) (defn schedule-job [params] ;; schedules the job into the queue and ;; adds the job to a jobs map for checking status via GET ;; note that the job should not be evaluated until popped from the queue ) (POST "/analyze" [{params :params}] (schedulde-job params)) (GET "job/:id" [:d] (get @jobs id)) ;; Some function that pops the next item from the queue ;; and evaluates it when the previous item is complete ;; Note: should not terminate when queue is empty! 

I looked at Lamina , which allows asynchronous processing, but this does not seem to fit my needs.

My question is how to delete the queue of job queues and complete the task after the previous one is completed, without interruption when the queue is empty, i.e. constantly process incoming tasks.

+7
source share
3 answers

A java.util.concurrent.ExecutorService might be what you want. This allows you to submit a task for later execution and returns a Future that you can request to see if it is complete.

 (import '[java.util.concurrent Callable Executors]) (def job-executor (Executors/newSingleThreadExecutor)) (def jobs (atom {})) (defn submit-job [func] (let [job-id (str (java.util.UUID/randomUUID)) callable (reify Callable (call [_] (func))] (swap! jobs assoc job-id (.submit job-executor callable)) job-id)) (use 'compojure.core) (defroutes app (POST "/jobs" [& params] (let [id (submit-job #(analyze params))] {:status 201 :headers {"Location" (str "/jobs/" id)}})) (GET "/jobs/:id" [id] (let [job-future (@jobs id)] (if (.isDone job-future) (.get job-future) {:status 404})))) 
+9
source

This is similar to what I expected, but it seems rather non-idiomatic. Anyone have thoughts on how to improve this?

 ;; Create a unique identifier (defn uuid [] (str (java.util.UUID/randomUUID))) ;; Create a job-queue and a map for keeping track of the status (def job-queue (ref clojure.lang.PersistentQueue/EMPTY)) (def jobs (atom {})) (defn dequeue! [queue-ref] ;; Pops the first element off the queue-ref (dosync (let [item (peek @queue-ref)] (alter queue-ref pop) item))) (defn schedule-job! [task] ;; Schedule a task to be executed, expects a function (task) to be evaluated (let [uuid (uuid) job (delay task)] (dosync (swap! jobs assoc uuid job) (alter job-queue conj job)))) (defn run-jobs [] ;; Runs the jobs (while true (Thread/sleep 10) (let [curr (dequeue! job-queue)] (if-not (nil? curr) (@curr))))) (.start (Thread. run-jobs)) 
+2
source

Your description is similar to the scenario of several manufacturers and one consumer. Below is an example of code (which you can connect to a REST file and, possibly, some exception handling so that the agent is not lost)

 (def worker (agent {})) (defn do-task [name func] (send worker (fn [results] (let [r (func)] (assoc results name r))))) ;submit tasks (do-task "uuid1" #(print 10)) (do-task "uuid2" #(+ 1 1)) ;get all results (print @worker) 
0
source

All Articles