Asynchronous Hosting in Azure Queues

I am trying to isolate messages in Azure Queues asynchronously as follows:

private async Task EnqueueItemsAsync(IEnumerable<string> messages) { var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg), null, null, null, null)); await Task.WhenAll(tasks); } 

If I understood correctly, this says: "Run enqueuing one item after another, not expecting them to be sent, save the link for each task, and then wait until everyone is sent."

This code works fine in most cases, but for a large number of elements (5000), it starts enqueuing and then throws a timeout exception (after the value is set to ~ 3500 elements).

I solved this, expecting everyone to finish before continuing with the next

 private async Task EnqueueItemsAsync(IEnumerable<string> messages) { foreach (var message in messages) { await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null); } } 

Can anyone explain why this happened?

An exception:

System.AggregateException that covers many of these exceptions: Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar) Request Information RequestID: RequestDate: StatusMessage: <--- ---> (Internal exception # 1) Microsoft.WindowsAzure.Storage.StorageException: The client could not complete the operation within the specified timeout. ---> System.TimeoutException: the client could not complete the operation within the specified timeout. --- End of internal check of the exception stack --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync [T] (IAsyncResult result) `.

+5
source share
2 answers

The queue in Azure is designed with a throughput of 2000 messages per second.

See: Azure scalability and scalability targets

When your application reaches the limit that the partition can handle for your workload, Azure Storage will start returning error code 503 (Server Busy) or error code 500 ( Operation timeout ). When this happens, the application should use an exponential return policy for retries. Exponential shutdown allows you to reduce the load on the partition and facilitate bursts of traffic in this section.

+6
source

It seems that you can make a more robust mechanism by passing QueryRequestOptions to AddMessageAsync .

Before sending the request, the request message adds these properties to the command.

I would try passing QueryRequestOptions and setting the value MaximumExecutionTime and ServerTimeout with a large value.

Here's how to fill out a request before submitting it:

 // Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd) { if (this.LocationMode.HasValue) { cmd.LocationMode = this.LocationMode.Value; } if (this.ServerTimeout.HasValue) { cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds); } if (this.OperationExpiryTime.HasValue) { cmd.OperationExpiryTime = this.OperationExpiryTime; return; } if (this.MaximumExecutionTime.HasValue) { cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value); } } 

And this is how it was sent:

 rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx) { HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex); return NullType.Value; }; 
+1
source

Source: https://habr.com/ru/post/1213723/


All Articles