How to make two streams side by side?

Is there an Akka thread combinator to do the following (or something like that)? (Name it and for now.)

 (flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

The semantics will be that regardless of the source, its elements will be transferred to both Flow s, and their outputs will be combined into a new Flow as a tuple. (For those familiar with arrows from the theory of the theory of aromatic functional programming, I am looking for something like &&& .)

There are two combinators in the library that looked relevant, namely zip and alsoTo . But the former accepts a SourceShape , and the latter SinkShape a SinkShape . None of them will allow a GraphShape . Why is this so?

My usage example is as follows:

 someSource .via(someFlowThatReturnsUnit.and(Flow.apply)) .runWith(someSink) 

Not finding something like .and , I changed my original Flow as follows:

 someSource .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) .runWith(someSink) 

This works, but I'm looking for a cleaner, more composite solution.

+7
scala functional-programming akka akka-stream
source share
1 answer

Note

As Viktor Klang noted in the comments, β€œzipping in a Tuple2[O,O2] viable only when it is known that both flows flow1 and flow2 are 1: 1 in terms of input element count and output element count,

Graphical solution

A tuple design can be created inside a Graph . In fact, your question almost perfectly matches the introductory example:

enter image description here

Extending the sample code in the link, you can use Broadcast and Zip

 val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = Source(1 to 10) val out = Sink.ignore val bcast = builder.add(Broadcast[Int](2)) val merge = builder.add(Zip[Int, Int]()) //different than link val f1, f2, f4 = Flow[Int].map(_ + 10) val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out bcast ~> f4 ~> merge ClosedShape })//end RunnableGraph.fromGraph 

Some Hacky Stream Solution

If you are looking for a solution with a clean stream, you can use intermediate streams, but Mat will not be supported and includes the materialization of 2 streams for each input element:

 def andFlows[I, O, O2] (maxConcurrentSreams : Int) (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = Flow[I].mapAsync(maxConcurrentStreams){ i => val o : Future[O] = Source .single(i) .via(flow1) .to(Sink.head[O]) .run() val o2 : Future[O2] = Source .single(i) .via(flow2) .to(Sink.head[O2]) .run() o zip o2 }//end Flow[I].mapAsync 

Generic Zipping

If you want to make this zipping generic, for most threads, then the type of output should be (Seq[O], Seq[O2]) . This type can be generated using Sink.seq instead of Sink.head in the andFlows function andFlows :

 def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = Flow[I].mapAsync(maxConcurrentStreams){ i => val o : Future[Seq[O]] = Source .single(i) .via(flow1) .to(Sink.seq[O]) .run() val o2 : Future[Seq[O2]] = Source .single(i) .via(flow2) .to(Sink.seq[O2]) .run() o zip o2 }//end Flow[I].mapAsync 
+6
source share

All Articles