How can I transfer the result of merging Akka threads to another thread?

I play with Akka threads and figured out most of the basics, but I don’t understand how to take the results Mergeand perform further operations (display, filtering, bending, etc.) in the subject.

I would like to modify the following code so that instead of linking the merge to the receiver, I could instead manipulate the data further.

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()

I assume that my main problem is that I cannot figure out how to create a component of a thread that has no source, and I cannot figure out how to perform a merge without using a special Merge and object ~>.

EDIT: This question and answer has been used and processed by Akka 0.11 threads.

+4
3

Merge, , concat Source :

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

, a b. Merge, - (, , , , ):

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)


val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run

, Flow Flow Source. , .

+6

Source.combine:

val items_a :: items_b :: items_c = List(
         Source(List(10,20,30,40,50)), 
         Source(List(60,70,80,90,100), 
         Source(List(110,120,130,140,1500))

Source.combine(items_a, items_b, items_c : _*)(Merge(_))
         .map(_+1)
         .runForeach(println)
+4

Or, if you need to preserve the order of input sources (for example, items_a is needed before items_b and items_b should be before items_c), you can use Concat instead of Merge.

val items_a :: items_b :: items_c = List(
     Source(List(10,20,30,40,50)), 
     Source(List(60,70,80,90,100), 
     Source(List(110,120,130,140,1500))
Source.combine(items_a, items_b, items_c : _*)(Concat(_))
0
source

All Articles