A thread never completes because a merge never completes a termination.
After formatting the chart structure, it looks something like this:
fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
merge <~ toRetry <~ broadcastArray
The non-completion problem is rooted in the merge step:
// 2 inputs into merge
fileSource ~> merge
merge <~ toRetry
fileSource ( (0, Array.empty[String])), complete .
fileSource . :
akka.stream.scaladsl.MergePreferred
, (eagerClose = false) upstream (eagerClose = true)
complete , .
, merge toRetry. toRetry , merge.
, fileSource, eagerClose=True, fileSource. :.
//Add this true |
// V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")
. Flow.map, tail recursive:
//Note: there is no use of akka in this implementation
type FileInputType = (Int, Array[String])
@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType =
fileInput match {
case (r,_) if r >= 3 => fileInput
case (r,a) => recursiveRetry((r+1, a))
}
val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry
fileSource ~> recursiveRetryFlow ~> toSink ~> sink
+, "-" akka. , . , , - "-". , , .
, , idiomatic Scala.