Custom Supervision.Decider does not see the exception thrown by ActorPublisher

I am creating a library that will be used by a third-party. In one of my methods, I return Stream[Item] , which is asynchronously generated from the result of a paginated REST API request.

I am using my modification of BulkPullerAsync . My code is here .

I want the receiver of my stream to be able to handle errors. According to the documentation , I have to use a custom Supervision.Decider .

 val decider: Supervision.Decider = { case ex => ex.printStackTrace() Supervision.Stop } implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider)) 

Unfortunately, it does not catch exceptions thrown in my ActionPublisher. I see that it is being processed, ActorPublisher.onError is ActorPublisher.onError , but it does not reach Supervision.Decider . It works with a simple stream provided in the documentation.

Errors also do not reach the actor if I use Sink.actorRef .

What should I do? I expect that users of my Stream should not depend on the nature of its implementation.

UPD : And for the sake of experiment, I tried the following sample

 val stream = Source(0 to 5).map(100 / _) stream.runWith(Sink.actorSubscriber(props)) 

In this case, the exception was caught by Decider .

UPD2 . I tried to trick him by creating Publisher from various sources and then translating them back to Source , expecting all errors to come to Subscriber.onError that happen :) So apparently this is something wrong with mixing ActorPublisher + Decider ...

In general, I consider this to be inconsistent behavior. I cannot use one mechanism to handle errors in Stream .

+5
source share

All Articles