Is it possible to make a lazy group by returning a stream in java 8?

I have several large text files that I want to process by grouping its lines.

I tried using new streaming features like

return FileUtils.readLines(...) .parallelStream() .map(...) .collect(groupingBy(pair -> pair[0])); 

The problem is that AFAIK generates a map.

Is there a way to have a high level code like the one above that generates, for example, a stream of records?

UPDATE I am looking for something like python itertools.groupby . My files are already sorted (in pairs [0]), I just want to load groups one by one.

I already have an iterative solution. I'm just wondering if there is a more declarative way to do this. Btw, using guava or another third-party library, will not be a big problem.

+5
java java-8 java-stream lazy-loading
Sep 03 '14 at 20:21
source share
3 answers

The task you want to achieve is very different from what the group does. groupingBy does not depend on the order of Stream s elements, but on the Map s algorithm applied to the result of the Function classifier.

You want to photograph adjacent items that share a property value into one List element. There is also no need to have a Stream sorted by this property, as long as you can guarantee that all elements that have the same property value are grouped.

It may be possible to formulate this task as a reduction, but for me the resulting structure looks too complicated.

So, if direct support for this function is not added to Stream s, an iterator-based approach looks the most pragmatic for me:

 class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { static <T,G> Stream<Map.Entry<G,List<T>>> foldBy( Stream<? extends T> s, Function<? super T, ? extends G> f) { return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); } private final Spliterator<? extends T> source; private final Function<? super T, ? extends G> pf; private final Consumer<T> c=this::addItem; private List<T> pending, result; private G pendingGroup, resultGroup; Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { source=s; pf=f; } private void addItem(T item) { G group=pf.apply(item); if(pending==null) pending=new ArrayList<>(); else if(!pending.isEmpty()) { if(!Objects.equals(group, pendingGroup)) { if(pending.size()==1) result=Collections.singletonList(pending.remove(0)); else { result=pending; pending=new ArrayList<>(); } resultGroup=pendingGroup; } } pendingGroup=group; pending.add(item); } public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { while(source.tryAdvance(c)) { if(result!=null) { action.accept(entry(resultGroup, result)); result=null; return true; } } if(pending!=null) { action.accept(entry(pendingGroup, pending)); pending=null; return true; } return false; } private Map.Entry<G,List<T>> entry(G g, List<T> l) { return new AbstractMap.SimpleImmutableEntry<>(g, l); } public int characteristics() { return 0; } public long estimateSize() { return Long.MAX_VALUE; } public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } } 

The flattering nature of the resulting folded Stream can best be demonstrated by applying it to an infinite stream:

 Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) .filter(e -> e.getKey()>5) .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+3
Sep 04 '14 at 10:40
source share

cyclops-react , the contributing library, offers sharding and grouping functionality that can do what you want.

  ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...) ) .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

The groupedStatefullyWhile operator allows you to group items based on the current state of the batch. ReactiveSeq is a single-threaded serial stream.

  Map<Key, Stream<Value> sharded = new LazyReact() .fromCollection(FileUtils.readLines(...) ) .map(..) .shard(shards, pair -> pair[0]); 

This will create a LazyFutureStream (which implements java.util.stream.Stream), which will process the data in the file asynchronously and in parallel. He is lazy and will not start processing until the data has passed.

The only caveat is that you need to identify fragments in advance. That is, the “fragments” parameter, above which is located the async.Queue card, introduced by the key to the fragment (perhaps any pair [0]?).

eg.

 Map<Integer,Queue<String>> shards; 

Below is an example with a video image and here is a test code

+1
Mar 29 '15 at 21:17
source share

This can be done using collapse with StreamEx

 final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; StreamEx.of(aa) .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) .forEach(System.out::println); 

We can add peek and limit to check the lazy calculation:

 StreamEx.of(aa) .peek(System.out::println) .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) .limit(1) .forEach(System.out::println); 
0
Jun 12 '17 at 19:52
source share



All Articles