What is the best way to disable the clojure core.async process pipeline

I have a clojure processing application that is a pipe conveyor. Each processing step performs its calculations asynchronously (i.e., makes an HTTP request using an http-kit or something else) and puts the result on the output channel. Thus, the next step can be read from this channel and perform its calculations.

My main function looks like

(defn -main [args] (-> file/tmp-dir (schedule/scheduler) (search/searcher) (process/resultprocessor) (buy/buyer) (report/reporter))) 

Currently, the scheduler step controls the pipeline (it does not have an input channel) and provides the chain with a workload.

When I ran this in REPL:

 (-main "some args") 

It basically works forever due to the infinity of the scheduler. What is the best way to change this architecture so that I can disconnect the whole system from REPL? Does each channel close that the system completes?

Did any broadcast channel help?

+8
clojure core.async
source share
3 answers

You may have your alts! / alts!! on the destruction channel and on the input channel of your conveyor:

 (def kill-channel (async/chan)) (defn scheduler [input output-ch kill-ch] (loop [] (let [[vp] (async/alts!! [kill-ch [out-ch (preprocess input)]] :priority true)] (if-not (= p kill-ch) (recur)))) 

Entering a value in kill-channel will end the loop.

Technically, you can also use output-ch to control the process (puts return false in closed pipes), but I usually find that explicit clean-up channels are cleared, at least for top-level pipelines.

To make things more elegant and easy to use at the same time (both on REPL and in production), you can use the Stuart Sierra component , start the scheduler loop (in a separate thread) and assoc the destruction channel to your component in the start component, and then close! destruction channel (and thus complete the loop) in the stop method.

+6
source share

I would suggest using something like https://github.com/stuartsierra/component to control system configuration. This ensures that you can easily start and stop your system in REPL. Using this library, you must configure it so that each processing step is a component, and each component handles installation and channel breaks in its start and stop protocols. In addition, you could create an IStream protocol for each component to be implemented, and each component depends on the components that implement this protocol. He buys you very light modularity.

As a result, you get a system that looks like this:

 (component/system-map :scheduler (schedule/new-scheduler file/tmp-dir) :searcher (component/using (search/searcher) {:in :scheduler}) :processor (component/using (process/resultprocessor) {:in :searcher}) :buyer (component/using (buy/buyer) {:in :processor}) :report (component/using (report/reporter) {:in :buyer})) 

One good thing with this approach is that you can easily add components if they also rely on the channel. For example, if each component creates its own output channel using tap in the internal mult , you can add a logger for the processor only with a logging component that treats the processor as a dependency.

  :processor (component/using (process/resultprocessor) {:in :searcher}) :processor-logger (component/using (log/logger) {:in processor}) 

I would recommend watching it to find out how it works.

+4
source share

You should consider using Stuart Sierra's reloaded workflow , which depends on how you model your pipeline elements so that you can model your logical singletones as “classes”, that is, you can control the start / stop logic for each of them.

+1
source share

All Articles