If you need a solution that does not add mutable state to a function that should not have it, you can call collect :
static void distinctForSorted(IntStream s, IntConsumer action) { s.collect(()->new long[]{Long.MIN_VALUE}, (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }}, (a, b)->{ throw new UnsupportedOperationException(); }); }
This works because it is an intended way to use mutable containers, however it cannot work in parallel, since splitting at arbitrary positions of a stream implies the possibility of colliding with a value in two (or even more) threads.
If you want to use only IntStream and not a forEach action for general purposes, a low-level Spliterator solution is preferable, despite the added complexity.
static IntStream distinctForSorted(IntStream s) { Spliterator.OfInt sp=s.spliterator(); return StreamSupport.intStream( new Spliterators.AbstractIntSpliterator(sp.estimateSize(), Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) { long last=Long.MIN_VALUE; @Override public boolean tryAdvance(IntConsumer action) { long prev=last; do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last); return true; } @Override public void forEachRemaining(IntConsumer action) { sp.forEachRemaining(distinct(action)); } @Override public Comparator<? super Integer> getComparator() { return null; } private IntConsumer distinct(IntConsumer c) { return i-> { if(i==last) return; assert i>last; last=i; c.accept(i); }; } }, false); }
It even inherits parallel support, although it works by pre-fetching some values before processing them in another thread, so it will not speed up the execution of a single operation, or perhaps subsequent operations, if there is intensive computation.
To complete, a separate operation is presented here for arbitrary, i.e. unsorted, IntStream , which does not rely on “boxing plus HashMap ”, so it can have a much better amount of memory:
static IntStream distinct(IntStream s) { boolean parallel=s.isParallel(); s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream(); if(parallel) s=s.parallel(); return s; }
It only works for positive int values; expanding to the 32-bit range will require two BitSet , so they do not look so short, but the often used precedent limits storage to the 31-bit range or even less ...