Refresh Oh, I did not see the accepted answer because I did not refresh the page> _ <. I will leave it here anyway, as I also added some notes about error handling.
I believe the following program does what you want:
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, IOResult} import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} import akka.util.ByteString import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import scala.util.control.NonFatal import java.nio.file.Paths import scala.concurrent.duration._ object TestMain extends App { implicit val actorSystem = ActorSystem("test") implicit val materializer = ActorMaterializer() implicit def ec = actorSystem.dispatcher val sources = Vector("build.sbt", ".gitignore") .map(Paths.get(_)) .map(p => FileIO.fromPath(p) .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) .mapMaterializedValue { f => f.onComplete { case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") } NotUsed } ) val finalSource = Source(sources).flatMapConcat(identity) val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) result.onComplete { case Success(n) => println(s"Read $n lines total") case Failure(e) => println(s"Reading failed: $e") } Await.ready(result, 10.seconds) actorSystem.terminate() }
The key point is the flatMapConcat() method: it converts each element of the stream to a source and returns the stream of elements provided by these sources if they are executed sequentially.
As for error handling, you can add a handler to the future in the mapMaterializedValue argument mapMaterializedValue or you can handle the final error of a working thread by placing the handler in the materialized future value of Sink.foreach . I did both in the above example, and if you test it in, say, a non-existent file, you will see that the same error message will be printed twice. Unfortunately, flatMapConcat() does not collect materialized values, and to be honest, I donβt see how it could do it safely, so you have to handle them separately if necessary.
source share