Can you split a stream into two streams?

I have a dataset represented by a Java 8 thread:

Stream<T> stream = ...; 

I see how to filter it to get a random subset - for example

 Random r = new Random(); PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator(); Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0)); 

I can also see how I could reduce this stream to get, for example, two lists representing two random halves of the data set, and then turn them back into streams. But is there a direct way to generate two threads from the source? Something like

 (heads, tails) = stream.[some kind of split based on filter] 

Thank you for understanding.

+98
java java-8 java-stream
Nov 12
source share
9 answers

Not really. You cannot get two Stream from one; it doesn’t make sense - how would you go through one without creating the other? A thread can only run once.

However, if you want to drop them into a list or something else, you can do

 stream.forEach((x) -> ((x == 0) ? heads : tails).add(x)); 
+9
Nov 12 '13 at 21:38
source share

A collector may be used for this.

  • For two categories, use Collectors.partitioningBy() factory.

This will create a Map from Boolean to List and put the items in one or another list based on Predicate .

Note. Since a thread needs to be consumed in its entirety, it cannot work on endless threads. Since the stream is still consumed, this method simply puts them in lists instead of creating a new stream with memory.

In addition, there is no need for an iterator, not even an example with the headers you provided.

 Random r = new Random(); Map<Boolean, List<String>> groups = stream .collect(Collectors.partitioningBy(x -> r.nextBoolean())); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 
  • For more categories, use Collectors.groupingBy() factory.
 Map<Object, List<String>> groups = stream .collect(Collectors.groupingBy(x -> r.nextInt(3))); System.out.println(groups.get(0).size()); System.out.println(groups.get(1).size()); System.out.println(groups.get(2).size()); 

If the streams are not Stream , but one of the primitive streams, such as IntStream , then this .collect(Collectors) method is not available. You will have to do this manually without a factory collector. This implementation is as follows:

 IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000); Predicate<Integer> p = x -> r.nextBoolean(); Map<Boolean, List<Integer>> groups = intStream.collect(() -> { Map<Boolean, List<Integer>> map = new HashMap<>(); map.put(false, new ArrayList<>()); map.put(true, new ArrayList<>()); return map; }, (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }, (map1, map2) -> { map1.get(false).addAll(map2.get(false)); map1.get(true).addAll(map2.get(true)); }); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 

Edit

As indicated, the above “workaround” is not thread safe. Going to normal Stream before building is the way to go:

 Stream<Integer> stream = intStream.boxed(); 
+201
May 07, '15 at 20:17
source share

Unfortunately, what you ask for is directly underestimated in the JavaDoc thread :

The thread should work (when calling an intermediate or terminal thread) only once. This eliminates, for example, “bifurcation”, threads where the same source feeds two or more pipelines or multiple bypasses of the same stream.

You can get around this with peek or other methods if you really want this type of behavior. In this case, you should do instead of trying to return two streams from the same source stream source using the forking filter, you should duplicate the stream and filter each of the duplicates accordingly.

However, you can reconsider if Stream is the right structure for your use.

+16
Nov 12 '13 at 22:27
source share

I stumbled upon this question myself, and I feel that the branched stream has some use cases that may turn out to be valid. I wrote the code below as a consumer so that it doesn't do anything, but you could apply it to functions and everything that you might come across.

 class PredicateSplitterConsumer<T> implements Consumer<T> { private Predicate<T> predicate; private Consumer<T> positiveConsumer; private Consumer<T> negativeConsumer; public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative) { this.predicate = predicate; this.positiveConsumer = positive; this.negativeConsumer = negative; } @Override public void accept(T t) { if (predicate.test(t)) { positiveConsumer.accept(t); } else { negativeConsumer.accept(t); } } } 

Now the code implementation might look something like this:

 personsArray.forEach( new PredicateSplitterConsumer<>( person -> person.getDateOfBirth().isPresent(), person -> System.out.println(person.getName()), person -> System.out.println(person.getName() + " does not have Date of birth"))); 
+9
Jul 24 '15 at
source share

This is contrary to the general mechanism of Stream. Let's say you can split Stream S0 into Sa and Sb as you like. Performing any terminal operation, say count() , all elements in S0 are necessarily "consumed" on Sa. Therefore, Sb has lost its data source.

Previously, Stream had the tee() method, which, in my opinion, duplicated the stream by two. Now it is deleted.

The stream has a peek () method, although you can use it to achieve your requirements.

+7
Nov 12 '13 at 21:40
source share

not really, but you can accomplish what you need by calling Collectors.groupingBy() . you create a new collection and then you can create threads in this new collection.

+5
Nov 13 '13 at 18:33
source share

That was the worst answer I could come up with.

 import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; public class Test { public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate, Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) { Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate)); L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream()); R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream()); return new ImmutablePair<L, R>(trueResult, falseResult); } public static void main(String[] args) { Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10); Pair<List<Integer>, String> results = splitStream(stream, n -> n > 5, s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()), s -> s.map(n -> n.toString()).collect(Collectors.joining("|"))); System.out.println(results); } } 

This takes a stream of integers and splits them into 5. For those greater than 5, it filters only even numbers and puts them on the list. Otherwise, he connects them with |.

outputs:

  ([6, 8],0|1|2|3|4|5) 

Its not perfect, as it collects everything into intermediate collections that break the stream (and has too many arguments!)

+1
Mar 17 '16 at 11:02
source share

I stumbled upon this question, looking for a way to filter certain elements from a stream and register them as errors. Therefore, I did not need to split the stream in the same way as applying a premature final action to a predicate with unobtrusive syntax. Here is what I came up with:

 public class MyProcess { /* Return a Predicate that performs a bail-out action on non-matching items. */ private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) { return x -> { if (pred.test(x)) { return true; } altAction.accept(x); return false; }; /* Example usage in non-trivial pipeline */ public void processItems(Stream<Item> stream) { stream.filter(Objects::nonNull) .peek(this::logItem) .map(Item::getSubItems) .filter(withAltAction(SubItem::isValid, i -> logError(i, "Invalid"))) .peek(this::logSubItem) .filter(withAltAction(i -> i.size() > 10, i -> logError(i, "Too large"))) .map(SubItem::toDisplayItem) .forEach(this::display); } } 
+1
Jun 01 '17 at 7:50
source share

What about:

 Supplier<Stream<Integer>> randomIntsStreamSupplier = () -> (new Random()).ints(0, 2).boxed(); Stream<Integer> tails = randomIntsStreamSupplier.get().filter(x->x.equals(0)); Stream<Integer> heads = randomIntsStreamSupplier.get().filter(x->x.equals(1)); 
-2
Feb 16 '17 at 6:06
source share



All Articles