You can view the stream using two sinks as a self-sorter. To build a more complex graph, we can use the functions presented in GraphDSL .
We consider in the general case
def splittingSink[T, M1, M2, Mat](f: T β Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) β Mat): Sink[T, Mat] = { val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder β (sink1, sink2) β { import GraphDSL.Implicits._ //Here we broadcast the Some[T] values to 2 flows, // each filtering to the correct type for each sink val bcast = builder.add(Broadcast[Option[T]](2)) bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) β t } ~> sink1.in bcast.out(1) ~> Flow[Option[T]].collect { case None β None } ~> sink2.in //The flow that maps T => Some[T] val mapper = builder.add(Flow.fromFunction(f)) mapper.out ~> bcast.in //The whole thing is a Sink[T] SinkShape(mapper.in) } } Sink.fromGraph(graph) }
This returns a Sink[T,Mat] , which, using the provided function, will map the incoming T elements to Option[T] , which then goes to one of the provided receivers.
Usage example:
val sink = splittingSink( (s: String) β if (s.length % 2 == 0) Some(s) else None, Sink.foreach[String](s), Sink.foreach[None.type](_ β println("None")), (f1: Future[_], f2: Future[_]) β Future.sequence(Seq(f1, f2)).map(_ β Done) ) Source(List("One", "Two", "Three", "Four", "Five", "Six")) .runWith(sink) .onComplete(_ β println("----\nDone"))
Output:
None None None Four Five None ---- Done
The use of GraphDSL is discussed further in the documentation section for Streaming Graphics .
T-fowl
source share