Lazy section

I have a source of elements and you want to separately process runs that have the same key function value. In Python, it will look like

for key_val, part in itertools.groupby(src, key_fn): process(key_val, part) 

This solution is completely lazy, i.e. if process does not try to store the contents of the integer part , the code will work in O(1) memory.

Clojure solution

 (doseq [part (partition-by key-fn src)] (process part)) 

less lazy: it fully implements every part. The problem is that src can have very long lines of elements with the same key-fn value, and their implementation can lead to OOM.

I found this discussion where I claimed that the following function (slightly modified to match the naming inside the message) is quite lazy

 (defn lazy-partition-by [key-fn coll] (lazy-seq (when-let [s (seq coll)] (let [fst (first s) fv (key-fn fst) part (lazy-seq (cons fst (take-while #(= fv (key-fn %)) (next s))))] (cons part (lazy-partition-by key-fn (drop-while #(= fv (key-fn %)) s))))))) 

However, I do not understand why it does not suffer from OOM: both parts of the cons cell keep a reference to s , so while the process consumes part , s is implemented, but not garbage collection. He will only be eligible for the GC when the drop-while passes part .

So my questions are:

  • Am I right on lazy-partition-by not lazy enough?
  • Is there a partition-by implementation with guaranteed memory requirements if I don't refer to the previous part by the time I start implementing the next?

EDIT: Here's a pretty lazy implementation in Haskell:

 lazyPartitionBy :: Eq b => (a -> b) -> [a] -> [[a]] lazyPartitionBy _ [] = [] lazyPartitionBy keyFn xl@(x:_) = let fv = keyFn x (part, rest) = span ((== fv) . keyFn) xl in part : lazyPartitionBy keyFn rest 

As you can see from the span implementation , part and rest implicitly share state. I wonder if this method can be translated into Clojure.

+8
clojure lazy-evaluation lazy-sequences
source share
3 answers

Although this question raises a very interesting reflection on language design, the practical problem is that you want to process partitions in read-only memory. And the practical problem is solvable with a slight inversion.

Instead of processing the result of a function that returns a sequence of sections, pass the processing function to a function that creates sections. Then you can manage the status offline.

First, we will provide the opportunity to fuse together the consumption of sequence with the state of the tail.

 (defn fuse [coll wick] (lazy-seq (when-let [s (seq coll)] (swap! wick rest) (cons (first s) (fuse (rest s) wick))))) 

Then a modified version of partition-by

 (defn process-partition-by [processfn keyfn coll] (lazy-seq (when (seq coll) (let [tail (atom (cons nil coll)) s (fuse coll tail) fst (first s) fv (keyfn fst) pred #(= fv (keyfn %)) part (take-while pred s) more (lazy-seq (drop-while pred @tail))] (cons (processfn part) (process-partition-by processfn keyfn more)))))) 

Note. For memory consumption, O (1) processfn must be an impatient consumer! So, while (process-partition-by identity key-fn coll) matches (partition-by key-fn coll) , because identity does not consume a partition, memory consumption is not constant.


Test

 (defn heavy-seq [] ;adjust payload for your JVM so only a few fit in memory (let [payload (fn [] (long-array 20000000))] (map #(vector % (payload)) (iterate inc 0)))) (defn my-process [s] (reduce + (map first s))) (defn test1 [] (doseq [part (partition-by #(quot (first %) 10) (take 50 (heavy-seq)))] (my-process part))) (defn test2 [] (process-partition-by my-process #(quot (first %) 20) (take 200 (heavy-seq)))) so.core=> (test1) OutOfMemoryError Java heap space [trace missing] so.core=> (test2) (190 590 990 1390 1790 2190 2590 2990 3390 3790) 
+3
source share

The rule of thumb that I use in these scenarios (i.e. those in which you want one input sequence to create multiple output sequences) is that of the following three desired properties, you can usually only have two:

  • Efficiency (move the input sequence only once, thus not holding her head)
  • Laziness (produce items only on demand)
  • No general altered state

The version in clojure.core selects (1,3), but resets (2), producing the entire section at once. Python and Haskell choose (1,2), although this is not immediately obvious: Doesn't Haskell have a volatile state at all? Well, his lazy appreciation of everything (and not just sequence) means that all expressions are thunks that begin as clean slates and only get written when their meaning is needed; the span implementation, as you say, shares the same thunk span p xs' in both of its output sequences, so depending on what it needs, it first “sends” it to the result of another sequence, performing the action at the distance required to preserve other pleasant properties.

An alternative implementation of Clojure that you linked to choice (2,3), as you noted.

The problem is that for partition-by decreasing (1) or (2) means that you are holding the head of a certain sequence: either the input or one of the outputs. Therefore, if you need a solution in which you can process arbitrarily large sections of arbitrarily large input, you need to select (1,2). There are several ways to do this in Clojure:

  • Take the Python approach: return something more like an iterator than seq-seqs, make stronger guarantees about non-mutation, and promise that you can safely pass them several times, etc. etc. If instead of a few seconds you return an iterator of iterators, then consuming elements from any iterator can freely mutate or nullify the other (s). This ensures that consumption occurs in order and that memory can be freed.
  • Take the Haskell approach: manually parse everything with a lot of delay calls, and require the client to call force as many times as necessary to receive the data. This will be much uglier in Clojure and significantly increase the depth of your stack (using this on a non-trivial input will probably delete the stack), but it is theoretically possible.
  • Write something more Clojure-fragmented (but still quite unusual), having several mutable data objects that are coordinated between output sections, each of which is updated as needed when something is requested from any of them.

I am sure that any of these three approaches is possible, but, frankly, they are all quite difficult and not quite natural. Clojure Sequence abstraction simply does not allow you to create the data structure that you need. My advice: if you need something like this, and the sections can be too large to fit comfortably, you just accept a slightly different format and do a little more accounting yourself: evade the dilemma (1,2,3) without creating multiple output sequentially!

So, instead of ((2 4 6 8) (1 3 5) (10 12) (7)) , which is your output format for something like (partition-by even? [2 4 6 8 1 3 5 10 12 7]) , you can accept a slightly ugly format: ([::key true] 2 4 6 8 [::key false] 1 3 5 [::key true] 10 12 [::key false] 7) . It is not difficult to mine and difficult to consume, although it is a little longer and tiring to write out.

Here is one sensible implementation of a producing function:

 (defn lazy-partition-by [f coll] (lazy-seq (when (seq coll) (let [x (first coll) k (fx)] (list* [::key k] x ((fn part [k xs] (lazy-seq (when (seq xs) (let [x (first xs) k' (fx)] (if (= k k') (cons x (part k (rest xs))) (list* [::key k'] x (part k' (rest xs)))))))) k (rest coll))))))) 

And here's how to use it, first defining a general reduce-grouped that hides the details of the grouping format, and then an example count-partition-sizes function to display the key and size of each partition without storing any sequences in memory:

 (defn reduce-grouped [f init groups] (loop [k nil, acc init, coll groups] (if (empty? coll) acc (if (and (coll? (first coll)) (= ::key (ffirst coll))) (recur (second (first coll)) acc (rest coll)) (recur k (fk acc (first coll)) (rest coll)))))) (defn count-partition-sizes [f coll] (reduce-grouped (fn [k acc _] (if (and (seq acc) (= k (first (peek acc)))) (conj (pop acc) (update-in (peek acc) [1] inc)) (conj acc [k 1]))) [] (lazy-partition-by f coll))) user> (lazy-partition-by even? [2 4 6 8 1 3 5 10 12 7]) ([:user/key true] 2 4 6 8 [:user/key false] 1 3 5 [:user/key true] 10 12 [:user/key false] 7) user> (count-partition-sizes even? [2 4 6 8 1 3 5 10 12 7]) [[true 4] [false 3] [true 2] [false 1]] 

Change Looking at it again, I’m not sure that my reduce-grouped much more useful than (reduce f init (map g xs)) , since it does not really give you a clear indication when the key changes. Therefore, if you need to know when the group will change, you will need a smarter abstraction or use my original lazy-partition-by , with nothing to do with a “smart” wrap.

+7
source share

Am I right about the lazy section - not lazy enough?

Well, there is a difference between laziness and memory usage. A sequence can be lazy and still requires a large amount of memory - see, for example, the implementation of clojure.core/distinct , which uses a set to remember all previously observed values ​​in a sequence. But yes, your analysis of lazy-partition-by memory requirements is correct - calling the function to calculate the chapter of the second partition will save the title of the first partition, which means that the implementation of the first partition forces it to be saved in Memory. This can be verified using the following code:

 user> (doseq [part (lazy-partition-by :a (repeatedly (fn [] {:a 1 :b (long-array 10000000)})))] (dorun part)) ; => OutOfMemoryError Java heap space 

Since neither doseq nor dorun holds his head, it just starts forever if lazy-partition-by were O (1) in memory.

Is there a partition-by implementation with guaranteed memory requirements if I don't refer to the previous part by the time I start to understand the next?

It would be very difficult, if not impossible, to write such an implementation in a purely functional way that would work in the general case. Consider that the general lazy-partition-by implementation cannot make any assumptions about when (or if) a partition is implemented. The only guaranteed correct way to find the beginning of the second section without going into some unpleasant bit of status in order to keep track of how much of the first section has been implemented is to remember where the first section started and scan forward on demand.

In the particular case, when you process records one by one for side effects and want them to be grouped by key (as implied by using doseq above), you can consider something in the loop / recur , which saves the state and re-sets it when changing the key.

+1
source share

All Articles