Akka Stream option output

I created an Akka stream that has simple Source , Flow and Sink . With this, I can easily send items through it. Now I want to change this thread so that Flow returns Option . Depending on the result of Option I want to change the output of Flow .

enter image description here

Is it possible to create such a design?

+7
scala akka akka-stream
source share
3 answers

Both answers cited at this time include Broadcast . Note that this may work in this particular example, but more complex Broadcast graphics may not be the smart choice. The reason is that Broadcast always counterpressed if at least one of the counterpressures is downstream. The best solution for measuring back pressure is Partition , which is able to selectively distribute back pressure from the branch selected by the Partitioner function.

Example below (clarification of one of T-Fowl's answers)

  def splittingSink[T, M1, M2, Mat](f: T β‡’ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) β‡’ Mat): Sink[T, Mat] = { val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder β‡’ (sink1, sink2) β‡’ { import GraphDSL.Implicits._ def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1) val partition = builder.add(Partition[Option[T]](2, partitioner)) partition.out(0) ~> Flow[Option[T]].collect { case Some(t) β‡’ t } ~> sink1.in partition.out(1) ~> Flow[Option[T]].collect { case None β‡’ None } ~> sink2.in val mapper = builder.add(Flow.fromFunction(f)) mapper.out ~> partition.in SinkShape(mapper.in) } } Sink.fromGraph(graph) } 
+7
source share

Suppose you have something like this

 val source = Source(1 to 100) val flow = Flow[Int].map { case x if x % 2 == 0 β‡’ Some(x.toString) case _ β‡’ None } val sink1 = Sink.foreach[String](println) val sink2 = Sink.foreach[None.type](x β‡’ println("dropped element")) 

You can make a runnable graph with the required structure as follows:

 val runnable = source .via(flow) .alsoTo(Flow[Option[String]].collect { case None β‡’ None }.to(sink2)) .to(Flow[Option[String]].collect { case Some(x) β‡’ x }.to(sink1)) 
+4
source share

You can view the stream using two sinks as a self-sorter. To build a more complex graph, we can use the functions presented in GraphDSL .

We consider in the general case

 def splittingSink[T, M1, M2, Mat](f: T β‡’ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) β‡’ Mat): Sink[T, Mat] = { val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder β‡’ (sink1, sink2) β‡’ { import GraphDSL.Implicits._ //Here we broadcast the Some[T] values to 2 flows, // each filtering to the correct type for each sink val bcast = builder.add(Broadcast[Option[T]](2)) bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) β‡’ t } ~> sink1.in bcast.out(1) ~> Flow[Option[T]].collect { case None β‡’ None } ~> sink2.in //The flow that maps T => Some[T] val mapper = builder.add(Flow.fromFunction(f)) mapper.out ~> bcast.in //The whole thing is a Sink[T] SinkShape(mapper.in) } } Sink.fromGraph(graph) } 

This returns a Sink[T,Mat] , which, using the provided function, will map the incoming T elements to Option[T] , which then goes to one of the provided receivers.

Usage example:

 val sink = splittingSink( (s: String) β‡’ if (s.length % 2 == 0) Some(s) else None, Sink.foreach[String](s), Sink.foreach[None.type](_ β‡’ println("None")), (f1: Future[_], f2: Future[_]) β‡’ Future.sequence(Seq(f1, f2)).map(_ β‡’ Done) ) Source(List("One", "Two", "Three", "Four", "Five", "Six")) .runWith(sink) .onComplete(_ β‡’ println("----\nDone")) 

Output:

 None None None Four Five None ---- Done 

The use of GraphDSL is discussed further in the documentation section for Streaming Graphics .

+3
source share

All Articles