Java 8 Stream mixing two elements

I have many objects of type Slot in the list of arrays.

The slot class is shown below -

Slot{ int start; int end; } 

let a list of type List<Slot> be called slots . Slots are sorted by start time. The end time of one slot may be equal to the start time of the next slot, but they will never overlap.

Is there any possible way that I can iterate over this list using Java 8 threads and combine two slots if the end time of one match begins with the next and displays them in an ArrayList ?

+6
source share
6 answers

Such a scenario is perfectly supported by my free StreamEx library, which improves the standard Stream API. There, intervalMap performs an intermediate operation, which is able to collapse several neighboring elements of the flow into one element. Here is a complete example:

 // Slot class and sample data are taken from @Andreas answer List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)); List<Slot> result = StreamEx.of(slots) .intervalMap((s1, s2) -> s1.end == s2.start, (s1, s2) -> new Slot(s1.start, s2.end)) .toList(); System.out.println(result); // Output: [3-7, 8-13] 

The intervalMap method accepts two parameters. The first is BiPredicate accepting two adjacent elements from the input stream and returns true if they should be combined (here the condition s1.end == s2.start ). The second parameter is BiFunction , which takes the first and last elements from the combined series and creates the resulting element.

Please note that if you have, for example, 100 neighboring slots that should be combined into one, this solution does not create 100 intermediate objects (for example, in @Misha's answer, which, however, is very interesting), it tracks the first and the last interval in the series immediately forgets about the intermediate fields. Of course, this solution is parallel. If you have many thousands of input slots, using .parallel() can improve performance.

Note that the current implementation recreates Slot , even if it does not merge with anything. In this case, BinaryOperator receives the same Slot parameter twice. If you want to optimize this case, you can do an additional check, for example s1 == s2 ? s1 : ... s1 == s2 ? s1 : ... :

 List<Slot> result = StreamEx.of(slots) .intervalMap((s1, s2) -> s1.end == s2.start, (s1, s2) -> s1 == s2 ? s1 : new Slot(s1.start, s2.end)) .toList(); 
+5
source

Since these types of questions raise a lot, I thought it might be an interesting exercise to write a collector that groups neighboring elements by predicate.

Assuming we can add union logic to the Slot class

 boolean canCombine(Slot other) { return this.end == other.start; } Slot combine(Slot other) { if (!canCombine(other)) { throw new IllegalArgumentException(); } return new Slot(this.start, other.end); } 

The groupingAdjacent collector can be used as follows:

 List<Slot> combined = slots.stream() .collect(groupingAdjacent( Slot::canCombine, // test to determine if two adjacent elements go together reducing(Slot::combine), // collector to use for combining the adjacent elements mapping(Optional::get, toList()) // collector to group up combined elements )); 

Alternatively, the second parameter could be collectingAndThen(reducing(Slot::combine), Optional::get) , and the third argument would be toList()

Here is the source of groupingAdjacent . It can handle null elements and is parallel friendly. With a little more hassle, a similar thing can be done using the Spliterator .

 public static <T, AI, I, AO, R> Collector<T, ?, R> groupingAdjacent( BiPredicate<? super T, ? super T> keepTogether, Collector<? super T, AI, ? extends I> inner, Collector<I, AO, R> outer ) { AI EMPTY = (AI) new Object(); // Container to accumulate adjacent possibly null elements. Adj can be in one of 3 states: // - Before first element: curGrp == EMPTY // - After first element but before first group boundary: firstGrp == EMPTY, curGrp != EMPTY // - After at least one group boundary: firstGrp != EMPTY, curGrp != EMPTY class Adj { T first, last; // first and last elements added to this container AI firstGrp = EMPTY, curGrp = EMPTY; AO acc = outer.supplier().get(); // accumlator for completed groups void add(T t) { if (curGrp == EMPTY) /* first element */ { first = t; curGrp = inner.supplier().get(); } else if (!keepTogether.test(last, t)) /* group boundary */ { addGroup(curGrp); curGrp = inner.supplier().get(); } inner.accumulator().accept(curGrp, last = t); } void addGroup(AI group) /* group can be EMPTY, in which case this should do nothing */ { if (firstGrp == EMPTY) { firstGrp = group; } else if (group != EMPTY) { outer.accumulator().accept(acc, inner.finisher().apply(group)); } } Adj merge(Adj other) { if (other.curGrp == EMPTY) /* other is empty */ { return this; } else if (this.curGrp == EMPTY) /* this is empty */ { return other; } else if (!keepTogether.test(last, other.first)) /* boundary between this and other*/ { addGroup(this.curGrp); addGroup(other.firstGrp); } else if (other.firstGrp == EMPTY) /* other container is single-group. prepend this.curGrp to other.curGrp*/ { other.curGrp = inner.combiner().apply(this.curGrp, other.curGrp); } else /* other Adj contains a boundary. this.curGrp+other.firstGrp form a complete group. */ { addGroup(inner.combiner().apply(this.curGrp, other.firstGrp)); } this.acc = outer.combiner().apply(this.acc, other.acc); this.curGrp = other.curGrp; this.last = other.last; return this; } R finish() { AO combined = outer.supplier().get(); if (curGrp != EMPTY) { addGroup(curGrp); assert firstGrp != EMPTY; outer.accumulator().accept(combined, inner.finisher().apply(firstGrp)); } return outer.finisher().apply(outer.combiner().apply(combined, acc)); } } return Collector.of(Adj::new, Adj::add, Adj::merge, Adj::finish); } 
+3
source

You can do this using the reduce () method when U is another List<Slot> , but it is much more confusing than just doing it in a for loop if parallel processing is not required.

See end of answer for test setup.

Here is the implementation of the for loop:

 List<Slot> mixed = new ArrayList<>(); Slot last = null; for (Slot slot : slots) if (last == null || last.end != slot.start) mixed.add(last = slot); else mixed.set(mixed.size() - 1, last = new Slot(last.start, slot.end)); 

Output

 [3-5, 5-7, 8-10, 10-11, 11-13] [3-7, 8-13] 

The following is a reduction in flow:

 List<Slot> mixed = slots.stream().reduce((List<Slot>)null, (list, slot) -> { System.out.println("accumulator.apply(" + list + ", " + slot + ")"); if (list == null) { List<Slot> newList = new ArrayList<>(); newList.add(slot); return newList; } Slot last = list.get(list.size() - 1); if (last.end != slot.start) list.add(slot); else list.set(list.size() - 1, new Slot(last.start, slot.end)); return list; }, (list1, list2) -> { System.out.println("combiner.apply(" + list1 + ", " + list2 + ")"); if (list1 == null) return list2; if (list2 == null) return list1; Slot lastOf1 = list1.get(list1.size() - 1); Slot firstOf2 = list2.get(0); if (lastOf1.end != firstOf2.start) list1.addAll(list2); else { list1.set(list1.size() - 1, new Slot(lastOf1.start, firstOf2.end)); list1.addAll(list2.subList(1, list2.size())); } return list1; }); 

Output

 accumulator.apply(null, 3-5) accumulator.apply([3-5], 5-7) accumulator.apply([3-7], 8-10) accumulator.apply([3-7, 8-10], 10-11) accumulator.apply([3-7, 8-11], 11-13) [3-5, 5-7, 8-10, 10-11, 11-13] [3-7, 8-13] 

Changing it for parallel (multi-threaded) processing:

 List<Slot> mixed = slots.stream().parallel().reduce(... 

Output

 accumulator.apply(null, 8-10) accumulator.apply(null, 3-5) accumulator.apply(null, 10-11) accumulator.apply(null, 11-13) combiner.apply([10-11], [11-13]) accumulator.apply(null, 5-7) combiner.apply([3-5], [5-7]) combiner.apply([8-10], [10-13]) combiner.apply([3-7], [8-13]) [3-5, 5-7, 8-10, 10-11, 11-13] [3-7, 8-13] 

Caveat

If slots is an empty list, the for loop version outputs an empty list, and the stream version results are null .


Test setup

The above code uses the following Slot class:

 class Slot { int start; int end; Slot(int start, int end) { this.start = start; this.end = end; } @Override public String toString() { return this.start + "-" + this.end; } } 

The variable slots been defined as:

 List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)); 

Both slots and mixed result are printed using:

 System.out.println(slots); System.out.println(mixed); 
+2
source

This is a two-line:

 List<Slot> condensed = new LinkedList<>(); slots.stream().reduce((a,b) -> {if (a.end == b.start) return new Slot(a.start, b.end); condensed.add(a); return b;}).ifPresent(condensed::add); 

If the slot fields are not visible, you will need to change a.end to a.getEnd() , etc.


Some test codes with some edge cases:

 List<List<Slot>> tests = Arrays.asList( Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)), Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)), Arrays.asList(new Slot(3, 5), new Slot(5, 7)), Collections.emptyList()); for (List<Slot> slots : tests) { List<Slot> condensed = new LinkedList<>(); slots.stream().reduce((a, b) -> {if (a.end == b.start) return new Slot(a.start, b.end); condensed.add(a); return b;}).ifPresent(condensed::add); System.out.println(condensed); } 

Output:

 [3-7, 8-13] [3-7, 8-11, 12-13] [3-7] [] 
+1
source

If you add the following method to the Slot class

 public boolean join(Slot s) { if(s.start != end) return false; end = s.end; return true; } 

you can complete the whole operation using the standard API as follows

 List<Slot> result = slots.stream().collect(ArrayList::new, (l, s)-> { if(l.isEmpty() || !l.get(l.size()-1).join(s)) l.add(s); }, (l1, l2)-> l1.addAll( l1.isEmpty()||l2.isEmpty()||!l1.get(l1.size()-1).join(l2.get(0))? l2: l2.subList(1, l2.size())) ); 

This is subject to the API contract (as opposed to the abuse of reduce ) and therefore will work without problems with parallel threads (although you will need really large source lists for parallel execution).


However, the above solution uses a joint Slot connection in place, which is only acceptable if you no longer need the original list / items. Otherwise, or if you use only immutable Slot instances, you need to create a new Slot instance representing the shared slots.

One possible solution looks like

 BiConsumer<List<Slot>,Slot> joinWithList=(l,s) -> { if(!l.isEmpty()) { Slot old=l.get(l.size()-1); if(old.end==s.start) { l.set(l.size()-1, new Slot(old.start, s.end)); return; } } l.add(s); }; List<Slot> result = slots.stream().collect(ArrayList::new, joinWithList, (l1, l2)-> { if(!l2.isEmpty()) { joinWithList.accept(l1, l2.get(0)); l1.addAll(l2.subList(1, l2.size())); } } ); 
+1
source

A clean (parallel safe) solution that does not require any new methods:

 List<Slot> condensed = slots.stream().collect(LinkedList::new, (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ? s : new Slot(l.removeLast().start, s.end)), (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) { l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);}); 

This uses a more appropriate implementation of the LinkedList list to easily remove and access the last element of the list when merging slots.


 List<List<Slot>> tests = Arrays.asList( Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)), Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)), Arrays.asList(new Slot(3, 5), new Slot(5, 7)), Collections.emptyList()); for (List<Slot> slots : tests) { List<Slot> condensed = slots.stream().collect(LinkedList::new, (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ? s : new Slot(l.removeLast().start, s.end)), (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) { l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);}); System.out.println(condensed); } 

Output:

 [[3, 7], [8, 13]] [[3, 7], [8, 11], [12, 13]] [[3, 7]] [] 
+1
source

All Articles