Using a scalaz stream, is it possible to split / fork and then join the stream again?
As an example, suppose I have the following function
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add) val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add) zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
Using the scalaz stream in this example, the results will be what you expect - a tuple of numbers from 1 to 10 is transferred to the sink.
However, if we replace streamOfNumbers with what the IO requires, it will actually execute the IO action twice.
Using Topic , I can create a pub / subprocess that correctly duplicates the elements in the stream, but it does not buffer - it just as quickly as possible loads the entire source, regardless of how it is absorbed.
I can wrap this in a limited queue, however the end result seems a lot more complicated than it should be.
Is there an easier way to split a stream in a scalaz stream without duplicating source I / O?
scala stream scalaz scalaz-stream
James davies
source share