Asynchronous API call inside an actor and exceptions

I know PipeTo , but some things, like synchronous waiting on a nested continuation, seem to go against an asynchronous and waiting path.

So, my first question [1] would be: is there any β€œmagic” here, so that we can just synchronously wait for the nested tasks to continue, and still async at the end?

While we are in async state and expect differences, how are errors handled?

Let me create a simple example:

 public static class AsyncOperations { public async static Task<int> CalculateAnswerAsync() { await Task.Delay(1000).ConfigureAwait(false); throw new InvalidOperationException("Testing!"); //return 42; } public async static Task<string> ConvertAsync(int number) { await Task.Delay(600).ConfigureAwait(false); return number + " :)"; } } 

In a "regular", asynchronous and pending way:

 var answer = await AsyncOperations.CalculateAnswerAsync(); var converted = await AsyncOperations.ConvertAsync(answer); 

the exception will exit the first operation, as expected.

Now create an actor that will work with these asynchronous operations. For the argument, suppose that CalculateAnswerAsync and ConvertAsync should be used one after the other as one complete operation (similarly, for example, StreamWriter.WriteLineAsync and StreamWriter.FlushAsync if you just want to write one line to the stream).

 public sealed class AsyncTestActor : ReceiveActor { public sealed class Start { } public sealed class OperationResult { private readonly string message; public OperationResult(string message) { this.message = message; } public string Message { get { return message; } } } public AsyncTestActor() { Receive<Start>(msg => { AsyncOperations.CalculateAnswerAsync() .ContinueWith(result => { var number = result.Result; var conversionTask = AsyncOperations.ConvertAsync(number); conversionTask.Wait(1500); return new OperationResult(conversionTask.Result); }) .PipeTo(Self); }); Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message)); } } 

If there are no exceptions, I still get Got 42 :) without any problems, which brings me back to the "magic" point above [1]. In addition, do the AttachedToParent and ExecuteSynchronously flags ExecuteSynchronously , which are optional in the example, or are they largely necessary for everything to work as intended? They don't seem to affect exception handling ...

Now, if CalculateAnswerAsync throws an exception, which means result.Result throws AggregateException , it almost swallowed without a trace.

What should I do, if at all possible, to make an exception inside an asynchronous operation, failing the actor as a "regular" exception?

+8
source share
2 answers

The joys of error handling in TPL :)

As soon as the task starts in its thread, everything that happens inside it is already asynchronous from the caller, including error handling

  • When you start your first Task inside an actor, this task is performed independently of ThreadPool from your actor. This means that everything you do inside the Task will already be asynchronous from your actor, because it works in a different thread. This is why I made a call to Task.Wait inside the PipeTo sample associated with at the top of the post. It does not matter to the actor - he just looks like a long-term task.
  • Exceptions - if your internal task failed, the conversionTask.Result property will throw an exception that was caught during its launch, so you will want to add some error handling to your Task to make sure that your actor receives a notification that something went wrong not this way. Notice that I did just that: https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - if you turn your Exceptions into Messages that your actor can process: birds start to sing, happy shine, and TPL errors cease to be a source of pain and agony.
  • What happens when an exception occurs ...

Now, if CalculateAnswerAsync throws an exception, it means that result.Result throws an AggregateException, it swallows quite a lot without a trace.

AggregateException will contain a list of internal exceptions wrapped inside it - the reason TPL has this concept of cumulative errors is because (a) you have one task, which is a continuation of many tasks in the aggregate, i.e. Task.WhenAll or (b) you have errors propagated by the ContinueWith chain back to the parent. You can also call AggregateException.Flatten() to simplify the management of nested exceptions.

Best recommendations for TPL + Akka.NET

Working with exceptions from TPL is a nuisance, it’s true, but the best way to deal with it is to try..catch.. exceptions inside your Task and turn them into message classes that your actor can handle.

Also, are the AttachedToParent and ExecuteSynchronously flags provided in the example optional, or are they pretty much necessary for everything to work as intended?

This is basically a problem when you have continuations in continuations - PipeTo automatically uses these flags on its own. It has zero impact on error handling, but ensures that your continuations are executed immediately in the same thread as the original Task .

I recommend using these flags only when you make a lot of nested continuations - TPL starts to accept some restrictions on how it schedules your tasks when you go deeper than 1 continuation (and in fact flags like OnlyOnCompleted cease to be accepted after more than one continuation.)

+9
source share

Just add what Aaron said. As of yesterday, we support secure asynchronous browsing inside participants when using the task manager.

 public class AsyncAwaitActor : ReceiveActor { public AsyncAwaitActor() { Receive<string>(async m => { await Task.Delay(TimeSpan.FromSeconds(1)); Sender.Tell("done"); }); } } public class AskerActor : ReceiveActor { public AskerActor(ActorRef other) { Receive<string>(async m => { var res = await other.Ask(m); Sender.Tell(res); }); } } public class ActorAsyncAwaitSpec : AkkaSpec { [Fact] public async Task Actors_should_be_able_to_async_await_ask_message_loop() { var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>() .WithDispatcher("akka.actor.task-dispatcher"), "Worker"); //IMPORTANT: you must use the akka.actor.task-dispatcher //otherwise async await is not safe var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor)) .WithDispatcher("akka.actor.task-dispatcher"), "Asker"); var res = await asker.Ask("something"); Assert.Equal("done", res); } } 

This is not our default dispatcher, as it has a price in performance / bandwidth. There is also the risk of locks if you run tasks that block (for example, using task.Wait() or task.Result ) Thus, the PipeTo template is still the preferred approach, as it is more true for the actor model. But async support is waiting for support for you as an additional tool if you really need to do some TPL integration.

This feature actually uses PipeTo under the covers. He will perform all tasks and complete them in a special message and pass this message back to the actor and perform this task inside his own concurrency context.

+6
source share

All Articles