Splitting a flow process in the form of a scalar into two child flows

Using a scalaz stream, is it possible to split / fork and then join the stream again?

As an example, suppose I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add) val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add) zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction ) 

Using the scalaz stream in this example, the results will be what you expect - a tuple of numbers from 1 to 10 is transferred to the sink.

However, if we replace streamOfNumbers with what the IO requires, it will actually execute the IO action twice.

Using Topic , I can create a pub / subprocess that correctly duplicates the elements in the stream, but it does not buffer - it just as quickly as possible loads the entire source, regardless of how it is absorbed.

I can wrap this in a limited queue, however the end result seems a lot more complicated than it should be.

Is there an easier way to split a stream in a scalaz stream without duplicating source I / O?

+7
scala stream scalaz scalaz-stream
source share
3 answers

Also clarify the previous answer with the requirement of "splitting". The solution to your specific problem may be without the need for thread separation:

 val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10) val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map { case even if even % 2 == 0 => right(even) case odd => left(odd) } val summed = oddOrEven.pipeW(sump1).pipeO(sump1) val evenSink: Sink[Task,Int] = ??? val oddSink: Sink[Task,Int] = ??? summed .drainW(evenSink) .to(oddSink) 
+6
source share

Perhaps you can still use the topic and simply assure that child processes will be signed before you insist on the topic.

However, keep in mind that this solution has no restrictions, that is, if you click too fast, you may encounter an OOM error.

 def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = { val topic = async.topic[A] val sub1 = topic.subscribe val sub2 = topic.subscribe merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain)) } 
+2
source share

I also need this functionality. My situation was rather complicated, not allowing me to get around this this way.

Thanks to Daniel Spiev's answer to this thread , I was able to get the following to work. I improved its solution by adding onHalt so that my application onHalt after Process complete.

 def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = { val left = async.boundedQueue[A](limit) val right = async.boundedQueue[A](limit) val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause => Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) } } val dequeue = Process((left.dequeue, right.dequeue)) enqueue merge dequeue } 
0
source share

All Articles