Java Streams - efficiently grouping items in sorted threads

I am looking for a way to implement an operation without terminal grouping, so that the overhead of memory will be minimal.

For example, consider various (). In the general case, he has no choice but to collect all the individual elements, and only then pass them forward. However, if we know that the input stream is already sorted, the operation can be performed "on the fly" using minimal memory.

I know that I can achieve this for iterators using the iterator shell and implement the grouping logic myself. Is there an easier way to implement this using the thread APIs?

- CHANGE -

I found a way to abuse Stream.flatMap (..) to achieve this:

private static class DedupSeq implements IntFunction<IntStream> { private Integer prev; @Override public IntStream apply(int value) { IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value); prev = value; return res; } } 

And then:

 IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println); 

What prints:

 1 3 4 5 

With some changes, the same method can be used for any type of operational grouping of streams by stream. In any case, I do not really like this solution, and I was looking for something more natural (for example, mapping or filtering). Also, I am breaking the contract here because the function provided by flatMap (..) is workable.

+8
java java-8 java-stream
source share
2 answers

If you need a solution that does not add mutable state to a function that should not have it, you can call collect :

 static void distinctForSorted(IntStream s, IntConsumer action) { s.collect(()->new long[]{Long.MIN_VALUE}, (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }}, (a, b)->{ throw new UnsupportedOperationException(); }); } 

This works because it is an intended way to use mutable containers, however it cannot work in parallel, since splitting at arbitrary positions of a stream implies the possibility of colliding with a value in two (or even more) threads.

If you want to use only IntStream and not a forEach action for general purposes, a low-level Spliterator solution is preferable, despite the added complexity.

 static IntStream distinctForSorted(IntStream s) { Spliterator.OfInt sp=s.spliterator(); return StreamSupport.intStream( new Spliterators.AbstractIntSpliterator(sp.estimateSize(), Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) { long last=Long.MIN_VALUE; @Override public boolean tryAdvance(IntConsumer action) { long prev=last; do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last); return true; } @Override public void forEachRemaining(IntConsumer action) { sp.forEachRemaining(distinct(action)); } @Override public Comparator<? super Integer> getComparator() { return null; } private IntConsumer distinct(IntConsumer c) { return i-> { if(i==last) return; assert i>last; last=i; c.accept(i); }; } }, false); } 

It even inherits parallel support, although it works by pre-fetching some values ​​before processing them in another thread, so it will not speed up the execution of a single operation, or perhaps subsequent operations, if there is intensive computation.


To complete, a separate operation is presented here for arbitrary, i.e. unsorted, IntStream , which does not rely on “boxing plus HashMap ”, so it can have a much better amount of memory:

 static IntStream distinct(IntStream s) { boolean parallel=s.isParallel(); s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream(); if(parallel) s=s.parallel(); return s; } 

It only works for positive int values; expanding to the 32-bit range will require two BitSet , so they do not look so short, but the often used precedent limits storage to the 31-bit range or even less ...

+4
source share

Do it right to turn the stream into a separator and then wrap it depending on the properties of the returned spliterator

  • performs naive deduplication using parallel dialing if the source is not sorted and different
  • performs optimized optimized deduplication if the source sorter is sorted. Support for trySplit operations will be complex, as you may need to advance the subdiplicator a few steps until you are sure that he does not see the tail launching fuzzy elements.
  • just returns the spliterator as is if the source is already different

After you have this separator, you can return it back to the stream with the same properties and continue to perform stream operations on it

Since we cannot modify existing jdk interfaces, the auxiliary API should look like this: dedup(IntStream.of(...).map(...)).collect(...) .


If you check the source java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>) , You will notice that the JDK works more or less for link-based streams.

Only the IntStream implementation ( java.util.stream.IntPipeline.distinct() ) uses an inefficient approach that does not take advantage of DISTINCT or SORTED .

It simply blindly converts IntStream to a boxed Integer stream and uses link-based deduplication without going through the corresponding flags that would make it memory efficient.

If this has not yet been fixed in jdk9, this may be a mistake, as it is essentially unnecessary memory consumption and loss of optimization potential for ops streams if they uselessly discard stream flags.

+1
source share

All Articles