How to dynamically add a source to an existing chart?

What could be an alternative to dynamically changing graphics? Here is my situation. I have a chart that swallows articles in the database. Articles are taken from 3 plugins in a different format. So I have multiple threads

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),
    sourceNews.via(converterFlow2),
    sourceCit)(Merge(_))

val res = merged
  .buffer(10, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

The problem is that I receive data from the content provider once every 24 hours, from the news once every 2 hours, and the last source can appear at any time, because it comes from people.

I understand that the graphs are immutable, but how can I periodically attach new instances Sourceto my graph so that I have a single throttle point for the swallowing process?

UPDATE: , - Source -s, . , Source ( ). . , Source.

+4
3

, , , .. Source[Source[T, _], Whatever] Source[T, Whatever]. flatMapConcat flatMapMerge. , Source[Source[Article, NotUsed], NotUsed], flatMap* Source[Article, NotUsed]. ( ), .

+2

Source[Source[_,_],_], Source.queue[Source[T,_]](queueSize, overflowStrategy):

, , - , , .

+1

I have implemented a code based on the answer of Vladimir Matveev and want to share it with others, as it looks like a normal use case.

I knew about Source.queue, about which Victor Klang spoke, but I did not know about flatMapConcat. This is pure awesomeness.

implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()

case class ImpArticle(text: String)
case class NewsArticle(text: String)
case class Article(text: String)

val converterFlow1: Flow[ImpArticle, Article, NotUsed] = Flow[ImpArticle].map(a => Article("a:" + a.text))
val converterFlow2: Flow[NewsArticle, Article, NotUsed] = Flow[NewsArticle].map(a => Article("a:" + a.text))
val sinkDB: Sink[Article, Future[Done]] = Sink.foreach { a =>
  Thread.sleep(1000)
  println(a)
}

// These are being created every time I poll plugins
val sourceContentProvider: Source[ImpArticle, NotUsed] = Source(List(ImpArticle("cp1"), ImpArticle("cp2")))
val sourceNews: Source[NewsArticle, NotUsed] = Source(List(NewsArticle("news1"), NewsArticle("news2")))
val sourceCit: Source[Article, NotUsed] = Source(List(Article("a1"), Article("a2")))

val (queue, completionFut) = Source
  .queue[Source[Article, NotUsed]](10, backpressure)
  .flatMapConcat(identity)
  .buffer(2, OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

queue.offer(sourceContentProvider.via(converterFlow1))
queue.offer(sourceNews.via(converterFlow2))
queue.offer(sourceCit)
queue.complete()

completionFut.onComplete {
  case Success(res) =>
    println(res)
    system.terminate()
  case Failure(ex) =>
    ex.printStackTrace()
    system.terminate()
}

Await.result(system.whenTerminated, Duration.Inf)

I will still check the success Futurereturned queue.offer, but in my case, these challenges will be quite rare.

+1
source

All Articles