Split stream by discriminator function

One of the missing Streams API features is the "split" transform, for example, as defined in Clojure . Let's say I want to play the Hibernate fetch connection: I want to issue one SQL SELECT statement to get such objects from the result:

class Family { String surname; List<String> members; } 

I release:

 SELECT f.name, m.name FROM Family f JOIN Member m on m.family_id = f.id ORDER BY f.name 

and I get a flat stream of records (f.name, m.name) . Now I need to convert it to a stream of Family objects, with a list of its members inside. Suppose I already have Stream<ResultRow> ; Now I need to convert it to Stream<List<ResultRow>> , and then act on it with a mapping transformation that turns it into Stream<Family> .

The semantics of the conversion are as follows: continue to compile the stream in List until the provided discriminator function continues to return the same value; as soon as the value changes, emit List as an element of the output stream and start collecting a new List .

I hope I can write such code (I already have a resultStream method):

 Stream<ResultRow> dbStream = resultStream(queryBuilder.createQuery( "SELECT f.name, m.name" + " FROM Family f JOIN Member m on m.family_id = f.id" + " ORDER BY f.name")); Stream<List<ResultRow> partitioned = partitionBy(r -> r.string(0), dbStream); Stream<Family> = partitioned.map(rs -> { Family f = new Family(rs.get(0).string(0)); f.members = rs.stream().map(r -> r.string(1)).collect(toList()); return f; }); 

Of course, I expect the resulting stream to remain lazy (not materialized), because I want to be able to process the result set of any size without affecting any O (n) memory restrictions. Without this important requirement, I would be pleased with the provided collector groupingBy .

+15
java java-8 java-stream
Feb 06 '15 at 10:12
source share
3 answers

The solution requires us to define a custom Spliterator that can be used to build a partitioned stream. We will need to access the input stream through our own separator and wrap it in ours. The output stream is then created from our custom delimiter.

The following Spliterator will turn any Stream<E> into Stream<List<E>> if a Function<E, ?> As a discriminator function. Note that the input stream must be ordered for this operation.

 public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> { private final Spliterator<E> spliterator; private final Function<? super E, ?> partitionBy; private HoldingConsumer<E> holder; private Comparator<List<E>> comparator; public PartitionBySpliterator(Spliterator<E> toWrap, Function<? super E, ?> partitionBy) { super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL); this.spliterator = toWrap; this.partitionBy = partitionBy; } public static <E> Stream<List<E>> partitionBy(Function<E, ?> partitionBy, Stream<E> in) { return StreamSupport.stream(new PartitionBySpliterator<>(in.spliterator(), partitionBy), false); } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final HoldingConsumer<E> h; if (holder == null) { h = new HoldingConsumer<>(); if (!spliterator.tryAdvance(h)) return false; holder = h; } else h = holder; final ArrayList<E> partition = new ArrayList<>(); final Object partitionKey = partitionBy.apply(h.value); boolean didAdvance; do partition.add(h.value); while ((didAdvance = spliterator.tryAdvance(h)) && Objects.equals(partitionBy.apply(h.value), partitionKey)); if (!didAdvance) holder = null; action.accept(partition); return true; } static final class HoldingConsumer<T> implements Consumer<T> { T value; @Override public void accept(T value) { this.value = value; } } @Override public Comparator<? super List<E>> getComparator() { final Comparator<List<E>> c = this.comparator; return c != null? c : (this.comparator = comparator()); } private Comparator<List<E>> comparator() { @SuppressWarnings({"unchecked","rawtypes"}) final Comparator<? super E> innerComparator = Optional.ofNullable(spliterator.getComparator()) .orElse((Comparator) naturalOrder()); return (left, right) -> { final int c = innerComparator.compare(left.get(0), right.get(0)); return c != 0? c : innerComparator.compare( left.get(left.size() - 1), right.get(right.size() - 1)); }; } } 
+11
Feb 06 '15 at 10:12
source share

For those of you who just want to split the stream, there are cartographers and collectors for that.

 class Person { String surname; String forename; public Person(String surname, String forename) { this.surname = surname; this.forename = forename; } @Override public String toString() { return forename; } } class Family { String surname; List<Person> members; public Family(String surname, List<Person> members) { this.surname = surname; this.members = members; } @Override public String toString() { return "Family{" + "surname=" + surname + ", members=" + members + '}'; } } private void test() { String[][] data = { {"Kray", "Ronald"}, {"Kray", "Reginald"}, {"Dors", "Diana"},}; // Their families. Stream<Family> families = Arrays.stream(data) // Build people .map(a -> new Person(a[0], a[1])) // Collect into a Map<String,List<Person>> as families .collect(Collectors.groupingBy(p -> p.surname)) // Convert them to families. .entrySet().stream() .map(p -> new Family(p.getKey(), p.getValue())); families.forEach(f -> System.out.println(f)); } 
+1
Aug 27 '15 at 15:11
source share

This can be done using collapse with StreamEx

 StreamEx.of(queryBuilder.createQuery( "SELECT f.name, m.name" + " FROM Family f JOIN Member m on m.family_id = f.id" + " ORDER BY f.name")) .collapse((a, b) -> a.string(0).equals(b.string(0)), Collectors.toList()) .map(l -> new Family(l.get(0).string(0), StreamEx.of(l).map(r -> r.string(1)).toList())) .forEach(System.out::println); 
0
Jun 12 '17 at 20:05
source share



All Articles