How to limit the number of HttpWebRequest per second relative to the web server?

I need to implement a throttling mechanism (requests per second) when using HttpWebRequest for concurrent requests to a single application server. My C # application should issue no more than 80 requests per second to a remote server. The restriction is imposed by administrators of remote services not as a hard limit, but as a "SLA" between my platform and them.

How can I control the number of requests per second when using HttpWebRequest?

+7
source share
3 answers

I had the same problem, and I could not find a ready-made solution, so I made one, and here it is. The idea is to use BlockingCollection<T> to add items that need processing, and use Reactive Extensions to subscribe to a speed-limited processor.

Throttle class is a renamed version of this speed limiter.

 public static class BlockingCollectionExtensions { // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) { Subject<T> subject = new Subject<T>(); // this is a dummyToken just so we can recreate the TokenSource // which we will pass the proxy class so it can cancel the task // on disposal CancellationToken dummyToken = new CancellationToken(); CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); var consumingTask = new Task(() => { using (var throttle = new Throttle(items, timePeriod)) { while (!sequence.IsCompleted) { try { T item = sequence.Take(producerToken); throttle.WaitToProceed(); try { subject.OnNext(item); } catch (Exception ex) { subject.OnError(ex); } } catch (OperationCanceledException) { break; } } subject.OnCompleted(); } }, TaskCreationOptions.LongRunning); return new TaskAwareObservable<T>(subject, consumingTask, tokenSource); } private class TaskAwareObservable<T> : IObservable<T>, IDisposable { private readonly Task task; private readonly Subject<T> subject; private readonly CancellationTokenSource taskCancellationTokenSource; public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource) { this.task = task; this.subject = subject; this.taskCancellationTokenSource = tokenSource; } public IDisposable Subscribe(IObserver<T> observer) { var disposable = subject.Subscribe(observer); if (task.Status == TaskStatus.Created) task.Start(); return disposable; } public void Dispose() { // cancel consumption and wait task to finish taskCancellationTokenSource.Cancel(); task.Wait(); // dispose tokenSource and task taskCancellationTokenSource.Dispose(); task.Dispose(); // dispose subject subject.Dispose(); } } } 

Unit test:

 class BlockCollectionExtensionsTest { [Fact] public void AsRateLimitedObservable() { const int maxItems = 1; // fix this to 1 to ease testing TimeSpan during = TimeSpan.FromSeconds(1); // populate collection int[] items = new[] { 1, 2, 3, 4 }; BlockingCollection<int> collection = new BlockingCollection<int>(); foreach (var i in items) collection.Add(i); collection.CompleteAdding(); IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); BlockingCollection<int> processedItems = new BlockingCollection<int>(); ManualResetEvent completed = new ManualResetEvent(false); DateTime last = DateTime.UtcNow; observable // this is so we'll receive exceptions .ObserveOn(new SynchronizationContext()) .Subscribe(item => { if (item == 1) last = DateTime.UtcNow; else { TimeSpan diff = (DateTime.UtcNow - last); last = DateTime.UtcNow; Assert.InRange(diff.TotalMilliseconds, during.TotalMilliseconds - 30, during.TotalMilliseconds + 30); } processedItems.Add(item); }, () => completed.Set() ); completed.WaitOne(); Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>()); } } 
+3
source

The extension methods Throttle () and Sample () (On Observable) allow you to adjust the fast sequence of events to a "slower" sequence.

Here's a blog post with an example Sample(Timespan) that provides maximum speed.

0
source

My initial post discussed how to add a throttling mechanism to WCF through extensions to client behavior, but then it was pointed out that I misunderstood the question (doh!).

In general, the approach may consist of checking with a class that determines whether we violate the speed limit or not. There has already been a lot of discussion about how to check bet violations.

The throttling method raises M requests in N seconds

If you violate the speed limit, then hibernate to fix it and retest. If not, continue and call the HttpWebRequest request.

-one
source

All Articles