Cartesian product of threads in Java 8 as a stream (using streams only)

I would like to create a method that creates a stream of elements that are the Cartesian products of several given streams (aggregated into the same type at the end with a binary operator). Note that both arguments and result are threads, not collections.

For example, for two streams {A, B} and {X, Y} I would like it to generate a stream of values {AX, AY, BX, BY} (simple concatenation is used to combine strings). So far I have come up with this code:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Stream<T>... streams) { Stream<T> result = null; for (Stream<T> stream : streams) { if (result == null) { result = stream; } else { result = result.flatMap(m -> stream.map(n -> aggregator.apply(m, n))); } } return result; } 

This is my desired use case:

 Stream<String> result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("X", "Y") ); System.out.println(result.collect(Collectors.toList())); 

Expected Result: AX, AY, BX, BY .

Another example:

 Stream<String> result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("K", "L"), Stream.of("X", "Y") ); 

Expected Result: AKX, AKY, ALX, ALY, BKX, BKY, BLX, BLY .

However, if I run the code, I get this error:

IllegalStateException: thread is already running or closed

Where is the stream consumed? On a flat map? Can it be easily fixed?

+7
source share
2 answers

Passing threads in your example is never better than passing lists:

 private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, List<T>... lists) { ... } 

And use it as follows:

 Stream<String> result = cartesian( (a, b) -> a + b, Arrays.asList("A", "B"), Arrays.asList("K", "L"), Arrays.asList("X", "Y") ); 

In both cases, you create an implicit array from varargs and use it as a data source, so laziness is imaginary. Your data is actually stored in arrays.

In most cases, the resulting Cartesian product stream is much longer than the input data, so there is practically no reason for lazy inputs. For example, having five lists of five elements (25 in total), you will get a resulting stream of 3125 elements. Thus, storing 25 items in memory is not a very big problem. In fact, in most practical cases they are already stored in memory.

To generate a stream of Cartesian products, you need to constantly "rewind" all threads (except the first). To rewind, streams must be able to restore the original data again and again, either by buffering them in some way (which you don't like) or by capturing them again from the source (collection, array, file, network, random numbers, etc.). d.). ) and repeat all intermediate operations again and again. If your initial and intermediate operations are slow, then a lazy solution can be much slower than a buffer solution. If your source cannot produce the data again (for example, a random number generator that cannot produce the same numbers that it produced earlier), your decision will be wrong.

Nevertheless, a completely lazy solution is possible. Just use not streams, but streaming providers:

 private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Supplier<Stream<T>>... streams) { return Arrays.stream(streams) .reduce((s1, s2) -> () -> s1.get().flatMap(t1 -> s2.get().map(t2 -> aggregator.apply(t1, t2)))) .orElse(Stream::empty).get(); } 

The solution is interesting, because we create and reduce the flow of suppliers to get the receiving supplier and, finally, call it. Using:

 Stream<String> result = cartesian( (a, b) -> a + b, () -> Stream.of("A", "B"), () -> Stream.of("K", "L"), () -> Stream.of("X", "Y") ); result.forEach(System.out::println); 
+9
source

stream consumed during the flatMap operation in the second iteration. Therefore, every time you get a map , you need to create a new stream. Therefore, you need to compile stream in advance to get a new stream at each iteration.

 private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) { Stream<T> result = null; for (Stream<T> stream : streams) { if (result == null) { result = stream; } else { Collection<T> s = stream.collect(Collectors.toList()); result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n))); } } return result; } 

Or even shorter:

 private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) { return Arrays.stream(streams).reduce((r, s) -> { List<T> collect = s.collect(Collectors.toList()); return r.flatMap(m -> collect.stream().map(n -> aggregator.apply(m, n))); }).orElse(Stream.empty()); } 
+4
source

All Articles