Merge two threads

I am trying to implement a method for combining values โ€‹โ€‹into two Stream based Comparator for values.

I had a way to do this, where I Stream.Builder over the streams and paste the values โ€‹โ€‹into Stream.Builder , but I was not able to figure out how to make a lazy evaluation (the way many streaming operations are), so it can also deal with infinite streams.

All I want to do is make a single convergent pass on the input, rather than sorting the threads (in fact, the threads will probably be messy, this mess needs to be kept).

 static Stream<E> merge(Stream<E> first, Stream<E> second, Comparator<E> c) 

How can lazily combine two threads like this?

If I did this with two Queue as input and some Consumer as output, it would be pretty simple:

 void merge(Queue<E> first, Queue<E> second, Consumer<E> out, Comparator<E> c){ while(!first.isEmpty() && !second.isEmpty() if(c.compare(first.peek(), second.peek()) <= 0) out.accept(first.remove()); else out.accept(second.remove()); for(E e:first) out.accept(e); for(E e:second) out.accept(e); } 

But I need to do this with lazy pricing and threads.

To refer to the comments, here are a few examples of inputs and results:

Example 1:

 merge( Stream.of(1, 2, 3, 1, 2, 3), Stream.of(2, 2, 3, 2, 2, 2), Comparator.naturalOrder() ); 

returns the stream that will generate this sequence:

 1, 2, 2, 2, 3, 3, 1, 2, 2, 2, 2, 3 

Example 2:

 merge( Stream.iterate(5, i->i-1), Stream.iterate(1, i->i+1), Comparator.naturalOrder() ); 

will return a stream of infinite (well, INT_MAX + 5 ), which will generate a sequence:

 1, 2, 3, 4, 5, 5, 4, 3, 2, 1, 0, -1 ... 

As you can see, this is not just concat(first,second).sort() , since (a) you cannot sort infinite streams, and (b) even when you can sort streams, this does not give the desired result.

+7
java merge java-8 java-stream
source share
1 answer

You need to implement Spliterator , not go through Stream.Builder . To do this, you can simply go through Iterator , as this is a fairly consistent operation. Using Guava is easy,

 return StreamSupport.stream(Spliterators.spliteratorUnknownSize( Iterators.mergeSorted( Arrays.asList(stream1.iterator(), stream2.iterator()), comparator), Spliterator.ORDERED), false /* not parallel */ ); 
+6
source share

All Articles