Azure Service Bus / Service Fabric message not removed from queue

I am moving the calculation engine from the Azure worker role in Azure Service Fabric. It works by listening to messages on the service bus, which are then processed based on the contents of the message.

The calculation is currently working correctly, but if the calculation takes more than a minute or so, the message will not be removed from the queue after completion. In the working role, we solved this by increasing the value of "AutoRenewTimeout".

var options = new OnMessageOptions { AutoComplete = true, AutoRenewTimeout = TimeSpan.FromMinutes(3) }; _queueClient.OnMessage(OnMessage, options); 

However, using the "ServiceFabric.ServiceBus" nuget package, I cannot decide where you will install it. I used the demo project as a reference to setting up a stateless service that actually starts the calculation. The following is an excerpt from CalculateService.cs, where Stateless is initialized.

 internal sealed class CalculateService : StatelessService { public CalculateService(StatelessServiceContext context) : base(context) { } /// <summary> /// Optional override to create listeners (eg, TCP, HTTP) for this service replica to handle client or user requests. /// </summary> /// <returns>A collection of listeners.</returns> protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners() { string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName"); yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(new Handler(this), context, serviceBusQueueName), "StatelessService-ServiceBusQueueListener"); } } internal sealed class Handler : AutoCompleteServiceBusMessageReceiver { protected override Task ReceiveMessageImplAsync(BrokeredMessage message, CancellationToken cancellationToken) { ServiceEventSource.Current.ServiceMessage(_service, $"Handling queue message {message.MessageId}"); var computeRole = new ExcelCompute(); var rMessage = new RangeMessage(); rMessage = message.GetBody<RangeMessage>(); var result = computeRole.OnMessage(rMessage, message.MessageId); //returns true if the compute was successful (which it currently, always is) return Task.FromResult(result); } } 

I tried using <BrokeredMessage> message.Complete(); but this caused a message blocking error.

+6
source share
1 answer
  • Get the latest version of the package (> = v3.5.0). Set the MessageLockRenewTimeSpan 'property of CommunicationListener value less than the duration of the lock. (e.g. 50 s, where the blocking time is 60 s). This allows you to skew the clock a little. Note: by default, this property is null, which means that automatic updating of the lock is not performed.

This option works well when processing a packet that takes longer than the blocking time allows.

or

  1. You can use BrokeredMessage.RenewLock to periodically extend message blocking when processing takes longer than the blocking duration. You can do this in a separate thread if necessary. Information may be helpful.

This option works well when processing a single message (batch size 1) takes longer than the blocking time allows.

+6
source

All Articles