I am creating a library that will be used by a third-party. In one of my methods, I return Stream[Item] , which is asynchronously generated from the result of a paginated REST API request.
I am using my modification of BulkPullerAsync . My code is here .
I want the receiver of my stream to be able to handle errors. According to the documentation , I have to use a custom Supervision.Decider .
val decider: Supervision.Decider = { case ex => ex.printStackTrace() Supervision.Stop } implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
Unfortunately, it does not catch exceptions thrown in my ActionPublisher. I see that it is being processed, ActorPublisher.onError is ActorPublisher.onError , but it does not reach Supervision.Decider . It works with a simple stream provided in the documentation.
Errors also do not reach the actor if I use Sink.actorRef .
What should I do? I expect that users of my Stream should not depend on the nature of its implementation.
UPD : And for the sake of experiment, I tried the following sample
val stream = Source(0 to 5).map(100 / _) stream.runWith(Sink.actorSubscriber(props))
In this case, the exception was caught by Decider .
UPD2 . I tried to trick him by creating Publisher from various sources and then translating them back to Source , expecting all errors to come to Subscriber.onError that happen :) So apparently this is something wrong with mixing ActorPublisher + Decider ...
In general, I consider this to be inconsistent behavior. I cannot use one mechanism to handle errors in Stream .