Custom Merge Akka Streams

I am new to akka-streams and do not know how to approach this problem.

I have 3 source streams sorted by sequence id. I want to group values ​​together that have the same identifier. Values ​​in each stream may be missing or duplicated. If one thread is a faster producer than the rest, it should get back pressure.

case class A(id: Int) case class B(id: Int) case class C(id: Int) case class Merged(as: List[A], bs: List[B], cs: List[C]) import akka.stream._ import akka.stream.scaladsl._ val as = Source(List(A(1), A(2), A(3), A(4), A(5))) val bs = Source(List(B(1), B(2), B(3), B(4), B(5))) val cs = Source(List(C(1), C(1), C(3), C(4))) val merged = ??? // value 1: Merged(List(A(1)), List(B(1)), List(C(1), C(1))) // value 2: Merged(List(A(2)), List(B(2)), Nil) // value 3: Merged(List(A(3)), List(B(3)), List(C(3))) // value 4: Merged(List(A(4)), List(B(4)), List(C(4))) // value 5: Merged(List(A(5)), List(B(5)), Nil) // (end of stream) 
+6
source share

All Articles