Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dotnet/runtime.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrennan <brecon@microsoft.com>2022-04-29 22:25:08 +0300
committerGitHub <noreply@github.com>2022-04-29 22:25:08 +0300
commit9c37a3b3eb6d955d54865a2f9edf557620ab7fa8 (patch)
tree63a20e303429a9ed2de4a2ee89129ac91aa8287b
parent083181f1e5e5ccc35699afb9a6508f7146f84351 (diff)
Add Fixed and Sliding Window RateLimiters + PartitionedRateLimiter.Create (#68695)v7.0.0-preview.4.22229.4release/7.0-preview4
* Add RL non-generic fixed window, sliding window implementations (#68087) * Add initial impl of PartitionedRateLimiter.Create (#67677) Co-authored-by: Shreya Verma <shve9863@colorado.edu>
-rw-r--r--src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs64
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj12
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs8
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs426
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs76
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs39
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs230
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs30
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs78
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs425
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs88
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs136
-rw-r--r--src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs8
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs6
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs49
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs746
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs466
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs83
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs770
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj6
-rw-r--r--src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs52
21 files changed, 3792 insertions, 6 deletions
diff --git a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs
index 13e779c61ca..9b3419ab6a2 100644
--- a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs
+++ b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs
@@ -40,6 +40,10 @@ namespace System.Threading.RateLimiting
public static bool operator !=(System.Threading.RateLimiting.MetadataName<T> left, System.Threading.RateLimiting.MetadataName<T> right) { throw null; }
public override string ToString() { throw null; }
}
+ public static partial class PartitionedRateLimiter
+ {
+ public static System.Threading.RateLimiting.PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(System.Func<TResource, System.Threading.RateLimiting.RateLimitPartition<TPartitionKey>> partitioner, System.Collections.Generic.IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull { throw null; }
+ }
public abstract partial class PartitionedRateLimiter<TResource> : System.IAsyncDisposable, System.IDisposable
{
protected PartitionedRateLimiter() { }
@@ -83,6 +87,21 @@ namespace System.Threading.RateLimiting
public abstract bool TryGetMetadata(string metadataName, out object? metadata);
public bool TryGetMetadata<T>(System.Threading.RateLimiting.MetadataName<T> metadataName, [System.Diagnostics.CodeAnalysis.MaybeNullAttribute] out T metadata) { throw null; }
}
+ public static partial class RateLimitPartition
+ {
+ public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateConcurrencyLimiter<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.ConcurrencyLimiterOptions> factory) { throw null; }
+ public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateNoLimiter<TKey>(TKey partitionKey) { throw null; }
+ public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateTokenBucketLimiter<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.TokenBucketRateLimiterOptions> factory) { throw null; }
+ public static System.Threading.RateLimiting.RateLimitPartition<TKey> Create<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.RateLimiter> factory) { throw null; }
+ }
+ public partial struct RateLimitPartition<TKey>
+ {
+ private readonly TKey _PartitionKey_k__BackingField;
+ private object _dummy;
+ private int _dummyPrimitive;
+ public RateLimitPartition(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.RateLimiter> factory) { throw null; }
+ public readonly TKey PartitionKey { get { throw null; } }
+ }
public abstract partial class ReplenishingRateLimiter : System.Threading.RateLimiting.RateLimiter
{
protected ReplenishingRateLimiter() { }
@@ -113,4 +132,49 @@ namespace System.Threading.RateLimiting
public int TokenLimit { get { throw null; } }
public int TokensPerPeriod { get { throw null; } }
}
+ public sealed partial class SlidingWindowRateLimiter : System.Threading.RateLimiting.ReplenishingRateLimiter
+ {
+ public SlidingWindowRateLimiter(System.Threading.RateLimiting.SlidingWindowRateLimiterOptions options) { }
+ public override System.TimeSpan? IdleDuration { get { throw null; } }
+ public override bool IsAutoReplenishing { get { throw null; } }
+ public override System.TimeSpan ReplenishmentPeriod { get { throw null; } }
+ protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int requestCount) { throw null; }
+ protected override void Dispose(bool disposing) { }
+ protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ public override int GetAvailablePermits() { throw null; }
+ public override bool TryReplenish() { throw null; }
+ protected override System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsyncCore(int requestCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+ }
+ public sealed partial class SlidingWindowRateLimiterOptions
+ {
+ public SlidingWindowRateLimiterOptions(int permitLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit, System.TimeSpan window, int segmentsPerWindow, bool autoReplenishment = true) { }
+ public bool AutoReplenishment { get { throw null; } }
+ public int QueueLimit { get { throw null; } }
+ public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } }
+ public System.TimeSpan Window { get { throw null; } }
+ public int PermitLimit { get { throw null; } }
+ public int SegmentsPerWindow { get { throw null; } }
+ }
+ public sealed partial class FixedWindowRateLimiter : System.Threading.RateLimiting.ReplenishingRateLimiter
+ {
+ public FixedWindowRateLimiter(System.Threading.RateLimiting.FixedWindowRateLimiterOptions options) { }
+ public override System.TimeSpan? IdleDuration { get { throw null; } }
+ public override bool IsAutoReplenishing { get { throw null; } }
+ public override System.TimeSpan ReplenishmentPeriod { get { throw null; } }
+ protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int requestCount) { throw null; }
+ protected override void Dispose(bool disposing) { }
+ protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ public override int GetAvailablePermits() { throw null; }
+ public override bool TryReplenish() { throw null; }
+ protected override System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsyncCore(int requestCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+ }
+ public sealed partial class FixedWindowRateLimiterOptions
+ {
+ public FixedWindowRateLimiterOptions(int permitLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit, System.TimeSpan window, bool autoReplenishment = true) { }
+ public bool AutoReplenishment { get { throw null; } }
+ public int QueueLimit { get { throw null; } }
+ public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } }
+ public System.TimeSpan Window { get { throw null; } }
+ public int PermitLimit { get { throw null; } }
+ }
}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj
index a87e55873f7..c3f39a246b6 100644
--- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj
+++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum)</TargetFrameworks>
<Nullable>enable</Nullable>
@@ -16,13 +16,22 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
<ItemGroup>
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiterOptions.cs" />
+ <Compile Include="System\Threading\RateLimiting\FixedWindowRateLimiter.cs" />
+ <Compile Include="System\Threading\RateLimiting\FixedWindowRateLimiterOptions.cs" />
<Compile Include="System\Threading\RateLimiting\MetadataName.cs" />
<Compile Include="System\Threading\RateLimiting\MetadataName.T.cs" />
+ <Compile Include="System\Threading\RateLimiting\NoopLimiter.cs" />
+ <Compile Include="System\Threading\RateLimiting\PartitionedRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\PartitionedRateLimiter.T.cs" />
<Compile Include="System\Threading\RateLimiting\QueueProcessingOrder.cs" />
<Compile Include="System\Threading\RateLimiting\RateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\RateLimitLease.cs" />
+ <Compile Include="System\Threading\RateLimiting\RateLimitPartition.cs" />
+ <Compile Include="System\Threading\RateLimiting\RateLimitPartition.T.cs" />
<Compile Include="System\Threading\RateLimiting\ReplenishingRateLimiter.cs" />
+ <Compile Include="System\Threading\RateLimiting\SlidingWindowRateLimiter.cs" />
+ <Compile Include="System\Threading\RateLimiting\SlidingWindowRateLimiterOptions.cs" />
+ <Compile Include="System\Threading\RateLimiting\TimerAwaitable.cs" />
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiterOptions.cs" />
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs" Link="Common\System\Collections\Generic\Deque.cs" />
@@ -30,6 +39,7 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
<Reference Include="System.Runtime" />
<Reference Include="System.Threading" />
+ <Reference Include="System.Collections" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs
index a733ea04e30..b50db111afd 100644
--- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs
@@ -112,7 +112,11 @@ namespace System.Threading.RateLimiting
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
- oldestRequest.Tcs.TrySetResult(FailedLease);
+ if (!oldestRequest.Tcs.TrySetResult(FailedLease))
+ {
+ // Updating queue count is handled by the cancellation code
+ _queueCount += oldestRequest.Count;
+ }
}
while (_options.QueueLimit - _queueCount < permitCount);
}
@@ -249,7 +253,7 @@ namespace System.Threading.RateLimiting
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
- next.Tcs.SetResult(FailedLease);
+ next.Tcs.TrySetResult(FailedLease);
}
}
}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs
new file mode 100644
index 00000000000..dcef507ea94
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs
@@ -0,0 +1,426 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// <see cref="RateLimiter"/> implementation that refreshes allowed permits in a window periodically.
+ /// </summary>
+ public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
+ {
+ private int _requestCount;
+ private int _queueCount;
+ private long _lastReplenishmentTick;
+ private long? _idleSince;
+ private bool _disposed;
+
+ private readonly Timer? _renewTimer;
+ private readonly FixedWindowRateLimiterOptions _options;
+ private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
+
+ private object Lock => _queue;
+
+ private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
+ private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);
+ private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;
+
+ /// <inheritdoc />
+ public override TimeSpan? IdleDuration => _idleSince is null ? null : new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));
+
+ /// <inheritdoc />
+ public override bool IsAutoReplenishing => _options.AutoReplenishment;
+
+ /// <inheritdoc />
+ public override TimeSpan ReplenishmentPeriod => _options.Window;
+
+ /// <summary>
+ /// Initializes the <see cref="FixedWindowRateLimiter"/>.
+ /// </summary>
+ /// <param name="options">Options to specify the behavior of the <see cref="FixedWindowRateLimiter"/>.</param>
+ public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _requestCount = options.PermitLimit;
+
+ _idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp();
+
+ if (_options.AutoReplenishment)
+ {
+ _renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
+ }
+ }
+
+ /// <inheritdoc/>
+ public override int GetAvailablePermits() => _requestCount;
+
+ /// <inheritdoc/>
+ protected override RateLimitLease AcquireCore(int requestCount)
+ {
+ // These amounts of resources can never be acquired
+ // Raises a PermitLimitExceeded ArgumentOutOFRangeException
+ if (requestCount > _options.PermitLimit)
+ {
+ throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit));
+ }
+
+ // Return SuccessfulLease or FailedLease depending to indicate limiter state
+ if (requestCount == 0 && !_disposed)
+ {
+ // Check if the requests are permitted in a window
+ // Requests will be allowed if the total served request is less than the max allowed requests (permit limit).
+ if (_requestCount > 0)
+ {
+ return SuccessfulLease;
+ }
+
+ return CreateFailedWindowLease(requestCount);
+ }
+
+ lock (Lock)
+ {
+ if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease))
+ {
+ return lease;
+ }
+
+ return CreateFailedWindowLease(requestCount);
+ }
+ }
+
+ /// <inheritdoc/>
+ protected override ValueTask<RateLimitLease> WaitAsyncCore(int requestCount, CancellationToken cancellationToken = default)
+ {
+ // These amounts of resources can never be acquired
+ if (requestCount > _options.PermitLimit)
+ {
+ throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit));
+ }
+
+ ThrowIfDisposed();
+
+ // Return SuccessfulAcquisition if requestCount is 0 and resources are available
+ if (requestCount == 0 && _requestCount > 0)
+ {
+ return new ValueTask<RateLimitLease>(SuccessfulLease);
+ }
+
+ lock (Lock)
+ {
+ if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease))
+ {
+ return new ValueTask<RateLimitLease>(lease);
+ }
+
+ // Avoid integer overflow by using subtraction instead of addition
+ Debug.Assert(_options.QueueLimit >= _queueCount);
+ if (_options.QueueLimit - _queueCount < requestCount)
+ {
+ if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && requestCount <= _options.QueueLimit)
+ {
+ // remove oldest items from queue until there is space for the newest acquisition request
+ do
+ {
+ RequestRegistration oldestRequest = _queue.DequeueHead();
+ _queueCount -= oldestRequest.Count;
+ Debug.Assert(_queueCount >= 0);
+ if (!oldestRequest.Tcs.TrySetResult(FailedLease))
+ {
+ _queueCount += oldestRequest.Count;
+ }
+ }
+ while (_options.QueueLimit - _queueCount < requestCount);
+ }
+ else
+ {
+ // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
+ return new ValueTask<RateLimitLease>(CreateFailedWindowLease(requestCount));
+ }
+ }
+
+ CancelQueueState tcs = new CancelQueueState(requestCount, this, cancellationToken);
+ CancellationTokenRegistration ctr = default;
+ if (cancellationToken.CanBeCanceled)
+ {
+ ctr = cancellationToken.Register(static obj =>
+ {
+ ((CancelQueueState)obj!).TrySetCanceled();
+ }, tcs);
+ }
+
+ RequestRegistration registration = new RequestRegistration(requestCount, tcs, ctr);
+ _queue.EnqueueTail(registration);
+ _queueCount += requestCount;
+ Debug.Assert(_queueCount <= _options.QueueLimit);
+
+ return new ValueTask<RateLimitLease>(registration.Tcs.Task);
+ }
+ }
+
+ private RateLimitLease CreateFailedWindowLease(int requestCount)
+ {
+ int replenishAmount = requestCount - _requestCount + _queueCount;
+ // can't have 0 replenish window, that would mean it should be a successful lease
+ int replenishWindow = Math.Max(replenishAmount / _options.PermitLimit, 1);
+
+ return new FixedWindowLease(false, TimeSpan.FromTicks(_options.Window.Ticks * replenishWindow));
+ }
+
+ private bool TryLeaseUnsynchronized(int requestCount, [NotNullWhen(true)] out RateLimitLease? lease)
+ {
+ ThrowIfDisposed();
+
+ // if permitCount is 0 we want to queue it if there are no available permits
+ if (_requestCount >= requestCount && _requestCount != 0)
+ {
+ if (requestCount == 0)
+ {
+ // Edge case where the check before the lock showed 0 available permit counters but when we got the lock, some permits were now available
+ lease = SuccessfulLease;
+ return true;
+ }
+
+ // a. If there are no items queued we can lease
+ // b. If there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
+ if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
+ {
+ _idleSince = null;
+ _requestCount -= requestCount;
+ Debug.Assert(_requestCount >= 0);
+ lease = SuccessfulLease;
+ return true;
+ }
+ }
+
+ lease = null;
+ return false;
+ }
+
+ /// <summary>
+ /// Attempts to replenish request counters in the window.
+ /// </summary>
+ /// <returns>
+ /// False if <see cref="FixedWindowRateLimiterOptions.AutoReplenishment"/> is enabled, otherwise true.
+ /// Does not reflect if counters were replenished.
+ /// </returns>
+ public override bool TryReplenish()
+ {
+ if (_options.AutoReplenishment)
+ {
+ return false;
+ }
+ Replenish(this);
+ return true;
+ }
+
+ private static void Replenish(object? state)
+ {
+ FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;
+ Debug.Assert(limiter is not null);
+
+ // Use Stopwatch instead of DateTime.UtcNow to avoid issues on systems where the clock can change
+ long nowTicks = Stopwatch.GetTimestamp();
+ limiter!.ReplenishInternal(nowTicks);
+ }
+
+ // Used in tests that test behavior with specific time intervals
+ private void ReplenishInternal(long nowTicks)
+ {
+ // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
+ lock (Lock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks)
+ {
+ return;
+ }
+
+ _lastReplenishmentTick = nowTicks;
+
+ int availableRequestCounters = _requestCount;
+ int maxPermits = _options.PermitLimit;
+ int resourcesToAdd;
+
+ if (availableRequestCounters < maxPermits)
+ {
+ resourcesToAdd = maxPermits - availableRequestCounters;
+ }
+ else
+ {
+ // All counters available, nothing to do
+ return;
+ }
+
+ _requestCount += resourcesToAdd;
+ Debug.Assert(_requestCount == _options.PermitLimit);
+
+ // Process queued requests
+ while (_queue.Count > 0)
+ {
+ RequestRegistration nextPendingRequest =
+ _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.PeekHead()
+ : _queue.PeekTail();
+
+ if (_requestCount >= nextPendingRequest.Count)
+ {
+ // Request can be fulfilled
+ nextPendingRequest =
+ _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.DequeueHead()
+ : _queue.DequeueTail();
+
+ _queueCount -= nextPendingRequest.Count;
+ _requestCount -= nextPendingRequest.Count;
+ Debug.Assert(_requestCount >= 0);
+
+ if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
+ {
+ // Queued item was canceled so add count back
+ _requestCount += nextPendingRequest.Count;
+ // Updating queue count is handled by the cancellation code
+ _queueCount += nextPendingRequest.Count;
+ }
+ nextPendingRequest.CancellationTokenRegistration.Dispose();
+ Debug.Assert(_queueCount >= 0);
+ }
+ else
+ {
+ // Request cannot be fulfilled
+ break;
+ }
+ }
+
+ if (_requestCount == _options.PermitLimit)
+ {
+ Debug.Assert(_idleSince is null);
+ Debug.Assert(_queueCount == 0);
+ _idleSince = Stopwatch.GetTimestamp();
+ }
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ lock (Lock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+ _disposed = true;
+ _renewTimer?.Dispose();
+ while (_queue.Count > 0)
+ {
+ RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.DequeueHead()
+ : _queue.DequeueTail();
+ next.CancellationTokenRegistration.Dispose();
+ next.Tcs.TrySetResult(FailedLease);
+ }
+ }
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+
+ return default;
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(FixedWindowRateLimiter));
+ }
+ }
+
+ private sealed class FixedWindowLease : RateLimitLease
+ {
+ private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name };
+
+ private readonly TimeSpan? _retryAfter;
+
+ public FixedWindowLease(bool isAcquired, TimeSpan? retryAfter)
+ {
+ IsAcquired = isAcquired;
+ _retryAfter = retryAfter;
+ }
+
+ public override bool IsAcquired { get; }
+
+ public override IEnumerable<string> MetadataNames => s_allMetadataNames;
+
+ public override bool TryGetMetadata(string metadataName, out object? metadata)
+ {
+ if (metadataName == MetadataName.RetryAfter.Name && _retryAfter.HasValue)
+ {
+ metadata = _retryAfter.Value;
+ return true;
+ }
+
+ metadata = default;
+ return false;
+ }
+ }
+
+ private readonly struct RequestRegistration
+ {
+ public RequestRegistration(int requestCount, TaskCompletionSource<RateLimitLease> tcs, CancellationTokenRegistration cancellationTokenRegistration)
+ {
+ Count = requestCount;
+ // Use VoidAsyncOperationWithData<T> instead
+ Tcs = tcs;
+ CancellationTokenRegistration = cancellationTokenRegistration;
+ }
+
+ public int Count { get; }
+
+ public TaskCompletionSource<RateLimitLease> Tcs { get; }
+
+ public CancellationTokenRegistration CancellationTokenRegistration { get; }
+ }
+
+ private sealed class CancelQueueState : TaskCompletionSource<RateLimitLease>
+ {
+ private readonly int _requestCount;
+ private readonly FixedWindowRateLimiter _limiter;
+ private readonly CancellationToken _cancellationToken;
+
+ public CancelQueueState(int requestCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken)
+ : base(TaskCreationOptions.RunContinuationsAsynchronously)
+ {
+ _requestCount = requestCount;
+ _limiter = limiter;
+ _cancellationToken = cancellationToken;
+ }
+
+ public new bool TrySetCanceled()
+ {
+ if (TrySetCanceled(_cancellationToken))
+ {
+ lock (_limiter.Lock)
+ {
+ _limiter._queueCount -= _requestCount;
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs
new file mode 100644
index 00000000000..0b8693b4792
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiterOptions.cs
@@ -0,0 +1,76 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// Options to specify the behavior of a <see cref="FixedWindowRateLimiter"/>.
+ /// </summary>
+ public sealed class FixedWindowRateLimiterOptions
+ {
+ /// <summary>
+ /// Initializes the <see cref="FixedWindowRateLimiterOptions"/>.
+ /// </summary>
+ /// <param name="permitLimit">Maximum number of requests that can be served in the window.</param>
+ /// <param name="queueProcessingOrder"></param>
+ /// <param name="queueLimit">Maximum number of unprocessed request counters waiting via <see cref="RateLimiter.WaitAsync(int, CancellationToken)"/>.</param>
+ /// <param name="window">
+ /// Specifies how often request counters can be replenished. Replenishing is triggered either by an internal timer if <paramref name="autoReplenishment"/> is true, or by calling <see cref="FixedWindowRateLimiter.TryReplenish"/>.
+ /// </param>
+ /// <param name="autoReplenishment">
+ /// Specifies whether request replenishment will be handled by the <see cref="FixedWindowRateLimiter"/> or by another party via <see cref="FixedWindowRateLimiter.TryReplenish"/>.
+ /// </param>
+ /// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/> or <paramref name="queueLimit"/> are less than 0. </exception>
+ public FixedWindowRateLimiterOptions(
+ int permitLimit,
+ QueueProcessingOrder queueProcessingOrder,
+ int queueLimit,
+ TimeSpan window,
+ bool autoReplenishment = true)
+ {
+ if (permitLimit < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(permitLimit));
+ }
+ if (queueLimit < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(queueLimit));
+ }
+
+ PermitLimit = permitLimit;
+ QueueProcessingOrder = queueProcessingOrder;
+ QueueLimit = queueLimit;
+ Window = window;
+ AutoReplenishment = autoReplenishment;
+ }
+
+ /// <summary>
+ /// Specifies the time window that takes in the requests.
+ /// </summary>
+ public TimeSpan Window { get; }
+
+ /// <summary>
+ /// Specified whether the <see cref="FixedWindowRateLimiter"/> is automatically refresh counters or if someone else
+ /// will be calling <see cref="FixedWindowRateLimiter.TryReplenish"/> to refresh counters.
+ /// </summary>
+ public bool AutoReplenishment { get; }
+
+ /// <summary>
+ /// Maximum number of permit counters that can be allowed in a window.
+ /// </summary>
+ public int PermitLimit { get; }
+
+ /// <summary>
+ /// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
+ /// </summary>
+ /// <value>
+ /// <see cref="QueueProcessingOrder.OldestFirst"/> by default.
+ /// </value>
+ public QueueProcessingOrder QueueProcessingOrder { get; }
+
+ /// <summary>
+ /// Maximum cumulative permit count of queued acquisition requests.
+ /// </summary>
+ public int QueueLimit { get; }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs
new file mode 100644
index 00000000000..064e2211c89
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/NoopLimiter.cs
@@ -0,0 +1,39 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace System.Threading.RateLimiting
+{
+ internal sealed class NoopLimiter : RateLimiter
+ {
+ private static readonly RateLimitLease _lease = new NoopLease();
+
+ private NoopLimiter() { }
+
+ public static NoopLimiter Instance { get; } = new NoopLimiter();
+
+ public override TimeSpan? IdleDuration => null;
+
+ public override int GetAvailablePermits() => int.MaxValue;
+
+ protected override RateLimitLease AcquireCore(int permitCount) => _lease;
+
+ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken)
+ => new ValueTask<RateLimitLease>(_lease);
+
+ private sealed class NoopLease : RateLimitLease
+ {
+ public override bool IsAcquired => true;
+
+ public override IEnumerable<string> MetadataNames => Array.Empty<string>();
+
+ public override bool TryGetMetadata(string metadataName, out object? metadata)
+ {
+ metadata = null;
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs
new file mode 100644
index 00000000000..9fb634ddadd
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs
@@ -0,0 +1,230 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// Contains methods to assist with creating a <see cref="PartitionedRateLimiter{TResource}"/>.
+ /// </summary>
+ public static class PartitionedRateLimiter
+ {
+ /// <summary>
+ /// Method used to create a default implementation of <see cref="PartitionedRateLimiter{TResource}"/>.
+ /// </summary>
+ /// <typeparam name="TResource">The resource type that is being rate limited.</typeparam>
+ /// <typeparam name="TPartitionKey">The type to distinguish partitions with.</typeparam>
+ /// <param name="partitioner">Method called every time an Acquire or WaitAsync call is made to figure out what rate limiter to apply to the request.
+ /// If the <see cref="RateLimitPartition{TKey}.PartitionKey"/> matches a cached entry then the rate limiter previously used for that key is used. Otherwise, the factory is called to get a new rate limiter.</param>
+ /// <param name="equalityComparer">Optional <see cref="IEqualityComparer{T}"/> to customize the comparison logic for <typeparamref name="TPartitionKey"/>.</param>
+ /// <returns></returns>
+ public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
+ Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
+ IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
+ {
+ return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
+ }
+ }
+
+ internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
+ {
+ private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
+
+ // TODO: Look at ConcurrentDictionary to try and avoid a global lock
+ private Dictionary<TKey, Lazy<RateLimiter>> _limiters;
+ private bool _disposed;
+ private TaskCompletionSource<object?> _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ // Used by the Timer to call TryRelenish on ReplenishingRateLimiters
+ // We use a separate list to avoid running TryReplenish (which might be user code) inside our lock
+ // And we cache the list to amortize the allocation cost to as close to 0 as we can get
+ private List<Lazy<RateLimiter>> _cachedLimiters = new();
+ private bool _cacheInvalid;
+ private TimerAwaitable _timer;
+ private Task _timerTask;
+
+ // Use the Dictionary as the lock field so we don't need to allocate another object for a lock and have another field in the object
+ private object Lock => _limiters;
+
+ public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
+ IEqualityComparer<TKey>? equalityComparer = null)
+ {
+ _limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
+ _partitioner = partitioner;
+
+ // TODO: Figure out what interval we should use
+ _timer = new TimerAwaitable(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100));
+ _timerTask = RunTimer();
+ }
+
+ private async Task RunTimer()
+ {
+ _timer.Start();
+ while (await _timer)
+ {
+ try
+ {
+ Replenish(this);
+ }
+ // TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception?
+ catch { }
+ }
+ _timer.Dispose();
+ }
+
+ public override int GetAvailablePermits(TResource resourceID)
+ {
+ return GetRateLimiter(resourceID).GetAvailablePermits();
+ }
+
+ protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount)
+ {
+ return GetRateLimiter(resourceID).Acquire(permitCount);
+ }
+
+ protected override ValueTask<RateLimitLease> WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken)
+ {
+ return GetRateLimiter(resourceID).WaitAsync(permitCount, cancellationToken);
+ }
+
+ private RateLimiter GetRateLimiter(TResource resourceID)
+ {
+ RateLimitPartition<TKey> partition = _partitioner(resourceID);
+ Lazy<RateLimiter>? limiter;
+ lock (Lock)
+ {
+ ThrowIfDisposed();
+ if (!_limiters.TryGetValue(partition.PartitionKey, out limiter))
+ {
+ // Using Lazy avoids calling user code (partition.Factory) inside the lock
+ limiter = new Lazy<RateLimiter>(() => partition.Factory(partition.PartitionKey));
+ _limiters.Add(partition.PartitionKey, limiter);
+ // Cache is invalid now
+ _cacheInvalid = true;
+ }
+ }
+ return limiter.Value;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ bool alreadyDisposed = CommonDispose();
+
+ _timerTask.GetAwaiter().GetResult();
+ _cachedLimiters.Clear();
+
+ if (alreadyDisposed)
+ {
+ _disposeComplete.Task.GetAwaiter().GetResult();
+ return;
+ }
+
+ // Safe to access _limiters outside the lock
+ // The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first
+ foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
+ {
+ limiter.Value.Value.Dispose();
+ }
+ _limiters.Clear();
+ _disposeComplete.TrySetResult(null);
+ }
+
+ protected override async ValueTask DisposeAsyncCore()
+ {
+ bool alreadyDisposed = CommonDispose();
+
+ await _timerTask.ConfigureAwait(false);
+ _cachedLimiters.Clear();
+
+ if (alreadyDisposed)
+ {
+ await _disposeComplete.Task.ConfigureAwait(false);
+ return;
+ }
+
+ foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
+ {
+ await limiter.Value.Value.DisposeAsync().ConfigureAwait(false);
+ }
+ _limiters.Clear();
+ _disposeComplete.TrySetResult(null);
+ }
+
+ // This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call
+ private bool CommonDispose()
+ {
+ lock (Lock)
+ {
+ if (_disposed)
+ {
+ return true;
+ }
+ _disposed = true;
+ _timer.Stop();
+ }
+ return false;
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(PartitionedRateLimiter));
+ }
+ }
+
+ private static void Replenish(DefaultPartitionedRateLimiter<TResource, TKey> limiter)
+ {
+ lock (limiter.Lock)
+ {
+ if (limiter._disposed)
+ {
+ return;
+ }
+
+ // If the cache has been invalidated we need to recreate it
+ if (limiter._cacheInvalid)
+ {
+ limiter._cachedLimiters.Clear();
+ bool cacheStillInvalid = false;
+ foreach (KeyValuePair<TKey, Lazy<RateLimiter>> kvp in limiter._limiters)
+ {
+ if (kvp.Value.IsValueCreated)
+ {
+ if (kvp.Value.Value is ReplenishingRateLimiter)
+ {
+ limiter._cachedLimiters.Add(kvp.Value);
+ }
+ }
+ else
+ {
+ // In rare cases the RateLimiter will be added to the storage but not be initialized yet
+ // keep cache invalid if there was a non-initialized RateLimiter
+ // the next time we run the timer the cache will be updated
+ // with the initialized RateLimiter
+ cacheStillInvalid = true;
+ }
+ }
+ limiter._cacheInvalid = cacheStillInvalid;
+ }
+ }
+
+ // cachedLimiters is safe to use outside the lock because it is only updated by the Timer
+ // and the Timer avoids re-entrancy issues via the _executingTimer field
+ foreach (Lazy<RateLimiter> rateLimiter in limiter._cachedLimiters)
+ {
+ Debug.Assert(rateLimiter.IsValueCreated && rateLimiter.Value is ReplenishingRateLimiter);
+ ((ReplenishingRateLimiter)rateLimiter.Value).TryReplenish();
+ }
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs
new file mode 100644
index 00000000000..bbec73cc98c
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.T.cs
@@ -0,0 +1,30 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// Type returned by <see cref="RateLimitPartition.Create"/> methods to be used by <see cref="PartitionedRateLimiter.Create"/> to know what partitions are configured.
+ /// </summary>
+ /// <typeparam name="TKey">The type to distinguish partitions with.</typeparam>
+ public struct RateLimitPartition<TKey>
+ {
+ /// <summary>
+ /// Constructs the <see cref="RateLimitPartition{TKey}"/> for use in <see cref="PartitionedRateLimiter.Create"/>.
+ /// </summary>
+ /// <param name="partitionKey">The specific key for this partition.</param>
+ /// <param name="factory">The function called when a rate limiter for the given <paramref name="partitionKey"/> is needed.</param>
+ public RateLimitPartition(TKey partitionKey, Func<TKey, RateLimiter> factory)
+ {
+ PartitionKey = partitionKey;
+ Factory = factory;
+ }
+
+ /// <summary>
+ /// The specific key for this partition.
+ /// </summary>
+ public TKey PartitionKey { get; }
+
+ internal readonly Func<TKey, RateLimiter> Factory;
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs
new file mode 100644
index 00000000000..134794940cf
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitPartition.cs
@@ -0,0 +1,78 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// Contains methods used in <see cref="PartitionedRateLimiter.Create"/> to assist in the creation of partitions for your rate limiter.
+ /// </summary>
+ public static class RateLimitPartition
+ {
+ /// <summary>
+ /// Defines a partition with the given rate limiter factory.
+ /// </summary>
+ /// <typeparam name="TKey">The type to distinguish partitions with.</typeparam>
+ /// <param name="partitionKey">The specific key for this partition. This will be used to check for an existing cached limiter before calling the <paramref name="factory"/>.</param>
+ /// <param name="factory">The function called when a rate limiter for the given <paramref name="partitionKey"/> is needed. This should be a new instance of a rate limiter every time it is called.</param>
+ /// <returns></returns>
+ public static RateLimitPartition<TKey> Create<TKey>(
+ TKey partitionKey,
+ Func<TKey, RateLimiter> factory)
+ {
+ return new RateLimitPartition<TKey>(partitionKey, factory);
+ }
+
+ /// <summary>
+ /// Defines a partition with a <see cref="ConcurrencyLimiter"/> with the given <see cref="ConcurrencyLimiterOptions"/>.
+ /// </summary>
+ /// <typeparam name="TKey">The type to distinguish partitions with.</typeparam>
+ /// <param name="partitionKey">The specific key for this partition. This will be used to check for an existing cached limiter before calling the <paramref name="factory"/>.</param>
+ /// <param name="factory">The function called when a rate limiter for the given <paramref name="partitionKey"/> is needed. This can return the same instance of <see cref="ConcurrencyLimiterOptions"/> across different calls.</param>
+ /// <returns></returns>
+ public static RateLimitPartition<TKey> CreateConcurrencyLimiter<TKey>(
+ TKey partitionKey,
+ Func<TKey, ConcurrencyLimiterOptions> factory)
+ {
+ return Create(partitionKey, key => new ConcurrencyLimiter(factory(key)));
+ }
+
+ /// <summary>
+ /// Defines a partition that will not have a rate limiter.
+ /// This means any calls to <see cref="PartitionedRateLimiter{TResource}.Acquire(TResource, int)"/> or <see cref="PartitionedRateLimiter{TResource}.WaitAsync(TResource, int, CancellationToken)"/> will always succeed for the given <paramref name="partitionKey"/>.
+ /// </summary>
+ /// <typeparam name="TKey">The type to distinguish partitions with.</typeparam>
+ /// <param name="partitionKey">The specific key for this partition.</param>
+ /// <returns></returns>
+ public static RateLimitPartition<TKey> CreateNoLimiter<TKey>(TKey partitionKey)
+ {
+ return Create(partitionKey, _ => NoopLimiter.Instance);
+ }
+
+ /// <summary>
+ /// Defines a partition with a <see cref="TokenBucketRateLimiter"/> with the given <see cref="TokenBucketRateLimiterOptions"/>.
+ /// </summary>
+ /// <remarks>
+ /// Set <see cref="TokenBucketRateLimiterOptions.AutoReplenishment"/> to <see langword="false"/> to save an allocation. This method will create a new options type and set <see cref="TokenBucketRateLimiterOptions.AutoReplenishment"/> to <see langword="false"/> otherwise.
+ /// </remarks>
+ /// <typeparam name="TKey">The type to distinguish partitions with.</typeparam>
+ /// <param name="partitionKey">The specific key for this partition.</param>
+ /// <param name="factory">The function called when a rate limiter for the given <paramref name="partitionKey"/> is needed. This can return the same instance of <see cref="TokenBucketRateLimiterOptions"/> across different calls.</param>
+ /// <returns></returns>
+ public static RateLimitPartition<TKey> CreateTokenBucketLimiter<TKey>(
+ TKey partitionKey,
+ Func<TKey, TokenBucketRateLimiterOptions> factory)
+ {
+ return Create(partitionKey, key =>
+ {
+ TokenBucketRateLimiterOptions options = factory(key);
+ // We don't want individual TokenBucketRateLimiters to have timers. We will instead have our own internal Timer handling all of them
+ if (options.AutoReplenishment is true)
+ {
+ options = new TokenBucketRateLimiterOptions(options.TokenLimit, options.QueueProcessingOrder, options.QueueLimit,
+ options.ReplenishmentPeriod, options.TokensPerPeriod, autoReplenishment: false);
+ }
+ return new TokenBucketRateLimiter(options);
+ });
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs
new file mode 100644
index 00000000000..df7c57f7757
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs
@@ -0,0 +1,425 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// <see cref="RateLimiter"/> implementation that replenishes permit counters periodically instead of via a release mechanism.
+ /// </summary>
+ public sealed class SlidingWindowRateLimiter : ReplenishingRateLimiter
+ {
+ private int _requestCount;
+ private int _queueCount;
+ private int[] _requestsPerSegment;
+ private int _currentSegmentIndex;
+ private long _lastReplenishmentTick;
+ private long? _idleSince;
+ private bool _disposed;
+
+ private readonly Timer? _renewTimer;
+ private readonly SlidingWindowRateLimiterOptions _options;
+ private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
+
+ // Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
+ private object Lock => _queue;
+
+ private static readonly RateLimitLease SuccessfulLease = new SlidingWindowLease(true, null);
+ private static readonly RateLimitLease FailedLease = new SlidingWindowLease(false, null);
+ private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;
+
+ /// <inheritdoc />
+ public override TimeSpan? IdleDuration => _idleSince is null ? null : new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));
+
+ /// <inheritdoc />
+ public override bool IsAutoReplenishing => _options.AutoReplenishment;
+
+ /// <inheritdoc />
+ public override TimeSpan ReplenishmentPeriod => new TimeSpan(_options.Window.Ticks / _options.SegmentsPerWindow);
+
+ /// <summary>
+ /// Initializes the <see cref="SlidingWindowRateLimiter"/>.
+ /// </summary>
+ /// <param name="options">Options to specify the behavior of the <see cref="SlidingWindowRateLimiter"/>.</param>
+ public SlidingWindowRateLimiter(SlidingWindowRateLimiterOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _requestCount = options.PermitLimit;
+
+ // _requestsPerSegment holds the no. of acquired requests in each window segment
+ _requestsPerSegment = new int[options.SegmentsPerWindow];
+ _currentSegmentIndex = 0;
+
+ _idleSince = _lastReplenishmentTick = Stopwatch.GetTimestamp();
+
+ if (_options.AutoReplenishment)
+ {
+ _renewTimer = new Timer(Replenish, this, ReplenishmentPeriod, ReplenishmentPeriod);
+ }
+ }
+
+ /// <inheritdoc/>
+ public override int GetAvailablePermits() => _requestCount;
+
+ /// <inheritdoc/>
+ protected override RateLimitLease AcquireCore(int requestCount)
+ {
+ // These amounts of resources can never be acquired
+ if (requestCount > _options.PermitLimit)
+ {
+ throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit));
+ }
+
+ // Return SuccessfulLease or FailedLease depending to indicate limiter state
+ if (requestCount == 0 && !_disposed)
+ {
+ if (_requestCount > 0)
+ {
+ return SuccessfulLease;
+ }
+
+ return FailedLease;
+ }
+
+ lock (Lock)
+ {
+ if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease))
+ {
+ return lease;
+ }
+
+ // TODO: Acquire additional metadata during a failed lease decision
+ return FailedLease;
+ }
+ }
+
+ /// <inheritdoc/>
+ protected override ValueTask<RateLimitLease> WaitAsyncCore(int requestCount, CancellationToken cancellationToken = default)
+ {
+ // These amounts of resources can never be acquired
+ if (requestCount > _options.PermitLimit)
+ {
+ throw new ArgumentOutOfRangeException(nameof(requestCount), requestCount, SR.Format(SR.PermitLimitExceeded, requestCount, _options.PermitLimit));
+ }
+
+ ThrowIfDisposed();
+
+ // Return SuccessfulAcquisition if resources are available
+ if (requestCount == 0 && _requestCount > 0)
+ {
+ return new ValueTask<RateLimitLease>(SuccessfulLease);
+ }
+
+ lock (Lock)
+ {
+ if (TryLeaseUnsynchronized(requestCount, out RateLimitLease? lease))
+ {
+ return new ValueTask<RateLimitLease>(lease);
+ }
+
+ // Avoid integer overflow by using subtraction instead of addition
+ Debug.Assert(_options.QueueLimit >= _queueCount);
+ if (_options.QueueLimit - _queueCount < requestCount)
+ {
+ if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && requestCount <= _options.QueueLimit)
+ {
+ // Remove oldest items from queue until there is space for the newest acquisition request
+ do
+ {
+ RequestRegistration oldestRequest = _queue.DequeueHead();
+ _queueCount -= oldestRequest.Count;
+ Debug.Assert(_queueCount >= 0);
+ if (!oldestRequest.Tcs.TrySetResult(FailedLease))
+ {
+ _queueCount += oldestRequest.Count;
+ }
+ }
+ while (_options.QueueLimit - _queueCount < requestCount);
+ }
+ else
+ {
+ // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
+ return new ValueTask<RateLimitLease>(FailedLease);
+ }
+ }
+
+ CancelQueueState tcs = new CancelQueueState(requestCount, this, cancellationToken);
+ CancellationTokenRegistration ctr = default;
+ if (cancellationToken.CanBeCanceled)
+ {
+ ctr = cancellationToken.Register(static obj =>
+ {
+ ((CancelQueueState)obj!).TrySetCanceled();
+ }, tcs);
+ }
+
+ RequestRegistration registration = new RequestRegistration(requestCount, tcs, ctr);
+ _queue.EnqueueTail(registration);
+ _queueCount += requestCount;
+ Debug.Assert(_queueCount <= _options.QueueLimit);
+
+ return new ValueTask<RateLimitLease>(registration.Tcs.Task);
+ }
+ }
+
+ private bool TryLeaseUnsynchronized(int requestCount, [NotNullWhen(true)] out RateLimitLease? lease)
+ {
+ ThrowIfDisposed();
+
+ // if requestCount is 0 we want to queue it if there are no available permits
+ if (_requestCount >= requestCount && _requestCount != 0)
+ {
+ if (requestCount == 0)
+ {
+ // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
+ lease = SuccessfulLease;
+ return true;
+ }
+
+ // a. If there are no items queued we can lease
+ // b. If there are items queued but the processing order is NewestFirst, then we can lease the incoming request since it is the newest
+ if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
+ {
+ _idleSince = null;
+ _requestsPerSegment[_currentSegmentIndex] += requestCount;
+ _requestCount -= requestCount;
+ Debug.Assert(_requestCount >= 0);
+ lease = SuccessfulLease;
+ return true;
+ }
+ }
+
+ lease = null;
+ return false;
+ }
+
+ /// <summary>
+ /// Attempts to replenish request counters in a window.
+ /// </summary>
+ /// <returns>
+ /// False if <see cref="SlidingWindowRateLimiterOptions.AutoReplenishment"/> is enabled, otherwise true.
+ /// Does not reflect if permits were replenished.
+ /// </returns>
+ public override bool TryReplenish()
+ {
+ if (_options.AutoReplenishment)
+ {
+ return false;
+ }
+
+ // Replenish call will slide the window one segment at a time
+ Replenish(this);
+ return true;
+ }
+
+ private static void Replenish(object? state)
+ {
+ SlidingWindowRateLimiter limiter = (state as SlidingWindowRateLimiter)!;
+ Debug.Assert(limiter is not null);
+
+ // Use Stopwatch instead of DateTime.UtcNow to avoid issues on systems where the clock can change
+ long nowTicks = Stopwatch.GetTimestamp();
+ limiter!.ReplenishInternal(nowTicks);
+ }
+
+ // Used in tests that test behavior with specific time intervals
+ private void ReplenishInternal(long nowTicks)
+ {
+ // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
+ lock (Lock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ if ((long)((nowTicks - _lastReplenishmentTick) * TickFrequency) < ReplenishmentPeriod.Ticks)
+ {
+ return;
+ }
+
+ _lastReplenishmentTick = nowTicks;
+
+ // Increment the current segment index while move the window
+ // We need to know the no. of requests that were acquired in a segment previously to ensure that we don't acquire more than the permit limit.
+ _currentSegmentIndex = (_currentSegmentIndex + 1) % _options.SegmentsPerWindow;
+ int oldSegmentRequestCount = _requestsPerSegment[_currentSegmentIndex];
+ _requestsPerSegment[_currentSegmentIndex] = 0;
+
+ if (oldSegmentRequestCount == 0)
+ {
+ return;
+ }
+
+ _requestCount += oldSegmentRequestCount;
+ Debug.Assert(_requestCount <= _options.PermitLimit);
+
+ // Process queued requests
+ while (_queue.Count > 0)
+ {
+ RequestRegistration nextPendingRequest =
+ _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.PeekHead()
+ : _queue.PeekTail();
+
+ // If we have enough permits after replenishing to serve the queued requests
+ if (_requestCount >= nextPendingRequest.Count)
+ {
+ // Request can be fulfilled
+ nextPendingRequest =
+ _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.DequeueHead()
+ : _queue.DequeueTail();
+
+ _queueCount -= nextPendingRequest.Count;
+ _requestCount -= nextPendingRequest.Count;
+ _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count;
+ Debug.Assert(_requestCount >= 0);
+
+ if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
+ {
+ // Queued item was canceled so add count back
+ _requestCount += nextPendingRequest.Count;
+ _requestsPerSegment[_currentSegmentIndex] -= nextPendingRequest.Count;
+ // Updating queue count is handled by the cancellation code
+ _queueCount += nextPendingRequest.Count;
+ }
+ nextPendingRequest.CancellationTokenRegistration.Dispose();
+ Debug.Assert(_queueCount >= 0);
+ }
+ else
+ {
+ // Request cannot be fulfilled
+ break;
+ }
+ }
+
+ if (_requestCount == _options.PermitLimit)
+ {
+ Debug.Assert(_idleSince is null);
+ Debug.Assert(_queueCount == 0);
+ _idleSince = Stopwatch.GetTimestamp();
+ }
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (!disposing)
+ {
+ return;
+ }
+
+ lock (Lock)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+ _disposed = true;
+ _renewTimer?.Dispose();
+ while (_queue.Count > 0)
+ {
+ RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
+ ? _queue.DequeueHead()
+ : _queue.DequeueTail();
+ next.CancellationTokenRegistration.Dispose();
+ next.Tcs.TrySetResult(FailedLease);
+ }
+ }
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+
+ return default;
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(SlidingWindowRateLimiter));
+ }
+ }
+
+ private sealed class SlidingWindowLease : RateLimitLease
+ {
+ private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name };
+
+ private readonly TimeSpan? _retryAfter;
+
+ public SlidingWindowLease(bool isAcquired, TimeSpan? retryAfter)
+ {
+ IsAcquired = isAcquired;
+ _retryAfter = retryAfter;
+ }
+
+ public override bool IsAcquired { get; }
+
+ public override IEnumerable<string> MetadataNames => s_allMetadataNames;
+
+ public override bool TryGetMetadata(string metadataName, out object? metadata)
+ {
+ if (metadataName == MetadataName.RetryAfter.Name && _retryAfter.HasValue)
+ {
+ metadata = _retryAfter.Value;
+ return true;
+ }
+
+ metadata = default;
+ return false;
+ }
+ }
+
+ private readonly struct RequestRegistration
+ {
+ public RequestRegistration(int requestCount, TaskCompletionSource<RateLimitLease> tcs, CancellationTokenRegistration cancellationTokenRegistration)
+ {
+ Count = requestCount;
+ // Use VoidAsyncOperationWithData<T> instead
+ Tcs = tcs;
+ CancellationTokenRegistration = cancellationTokenRegistration;
+ }
+
+ public int Count { get; }
+
+ public TaskCompletionSource<RateLimitLease> Tcs { get; }
+
+ public CancellationTokenRegistration CancellationTokenRegistration { get; }
+ }
+
+ private sealed class CancelQueueState : TaskCompletionSource<RateLimitLease>
+ {
+ private readonly int _requestCount;
+ private readonly SlidingWindowRateLimiter _limiter;
+ private readonly CancellationToken _cancellationToken;
+
+ public CancelQueueState(int requestCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken)
+ : base(TaskCreationOptions.RunContinuationsAsynchronously)
+ {
+ _requestCount = requestCount;
+ _limiter = limiter;
+ _cancellationToken = cancellationToken;
+ }
+
+ public new bool TrySetCanceled()
+ {
+ if (TrySetCanceled(_cancellationToken))
+ {
+ lock (_limiter.Lock)
+ {
+ _limiter._queueCount -= _requestCount;
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs
new file mode 100644
index 00000000000..159b4338f07
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiterOptions.cs
@@ -0,0 +1,88 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Threading.RateLimiting
+{
+ /// <summary>
+ /// Options to specify the behavior of a <see cref="SlidingWindowRateLimiter"/>.
+ /// </summary>
+ public sealed class SlidingWindowRateLimiterOptions
+ {
+ /// <summary>
+ /// Initializes the <see cref="SlidingWindowRateLimiterOptions"/>.
+ /// </summary>
+ /// <param name="permitLimit">Maximum number of request counters that can be served in a window.</param>
+ /// <param name="queueProcessingOrder"></param>
+ /// <param name="queueLimit">Maximum number of unprocessed request counters waiting via <see cref="RateLimiter.WaitAsync(int, CancellationToken)"/>.</param>
+ /// <param name="window">
+ /// Specifies how often requests can be replenished. Replenishing is triggered either by an internal timer if <paramref name="autoReplenishment"/> is true, or by calling <see cref="SlidingWindowRateLimiter.TryReplenish"/>.
+ /// </param>
+ /// <param name="segmentsPerWindow">Specified how many segments a window can be divided into. The total requests a segment can serve cannot exceed the max limit.<paramref name="permitLimit"/>.</param>
+ /// <param name="autoReplenishment">
+ /// Specifies whether request replenishment will be handled by the <see cref="SlidingWindowRateLimiter"/> or by another party via <see cref="SlidingWindowRateLimiter.TryReplenish"/>.
+ /// </param>
+ /// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/>, <paramref name="queueLimit"/>, or <paramref name="segmentsPerWindow"/> are less than 0. </exception>
+ public SlidingWindowRateLimiterOptions(
+ int permitLimit,
+ QueueProcessingOrder queueProcessingOrder,
+ int queueLimit,
+ TimeSpan window,
+ int segmentsPerWindow,
+ bool autoReplenishment = true)
+ {
+ if (permitLimit < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(permitLimit));
+ }
+ if (queueLimit < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(queueLimit));
+ }
+ if (segmentsPerWindow <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(segmentsPerWindow));
+ }
+
+ PermitLimit = permitLimit;
+ QueueProcessingOrder = queueProcessingOrder;
+ QueueLimit = queueLimit;
+ Window = window;
+ SegmentsPerWindow = segmentsPerWindow;
+ AutoReplenishment = autoReplenishment;
+ }
+
+ /// <summary>
+ /// Specifies the minimum period between replenishments.
+ /// </summary>
+ public TimeSpan Window { get; }
+
+ /// <summary>
+ /// Specifies the maximum number of segments a window is divided into.
+ /// </summary>
+ public int SegmentsPerWindow { get; }
+
+ /// <summary>
+ /// Specified whether the <see cref="SlidingWindowRateLimiter"/> is automatically replenishing request counters or if someone else
+ /// will be calling <see cref="SlidingWindowRateLimiter.TryReplenish"/> to replenish tokens.
+ /// </summary>
+ public bool AutoReplenishment { get; }
+
+ /// <summary>
+ /// Maximum number of requests that can be served in a window.
+ /// </summary>
+ public int PermitLimit { get; }
+
+ /// <summary>
+ /// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
+ /// </summary>
+ /// <value>
+ /// <see cref="QueueProcessingOrder.OldestFirst"/> by default.
+ /// </value>
+ public QueueProcessingOrder QueueProcessingOrder { get; }
+
+ /// <summary>
+ /// Maximum cumulative permit count of queued acquisition requests.
+ /// </summary>
+ public int QueueLimit { get; }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs
new file mode 100644
index 00000000000..0b680201cc8
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TimerAwaitable.cs
@@ -0,0 +1,136 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace System.Threading.RateLimiting
+{
+ internal sealed class TimerAwaitable : IDisposable, ICriticalNotifyCompletion
+ {
+ private Timer? _timer;
+ private Action? _callback;
+ private static readonly Action _callbackCompleted = () => { };
+
+ private readonly TimeSpan _period;
+
+ private readonly TimeSpan _dueTime;
+ private readonly object _lockObj = new object();
+ private bool _disposed;
+ private bool _running = true;
+
+ public TimerAwaitable(TimeSpan dueTime, TimeSpan period)
+ {
+ _dueTime = dueTime;
+ _period = period;
+ }
+
+ public void Start()
+ {
+ if (_timer == null)
+ {
+ lock (_lockObj)
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ if (_timer == null)
+ {
+ // Don't capture the current ExecutionContext and its AsyncLocals onto the timer
+ bool restoreFlow = false;
+ try
+ {
+ if (!ExecutionContext.IsFlowSuppressed())
+ {
+ ExecutionContext.SuppressFlow();
+ restoreFlow = true;
+ }
+
+ _timer = new Timer(static state =>
+ {
+ var thisRef = (TimerAwaitable)state!;
+ thisRef.Tick();
+ },
+ state: this,
+ dueTime: _dueTime,
+ period: _period);
+ }
+ finally
+ {
+ // Restore the current ExecutionContext
+ if (restoreFlow)
+ {
+ ExecutionContext.RestoreFlow();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public TimerAwaitable GetAwaiter() => this;
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ public bool GetResult()
+ {
+ _callback = null;
+
+ return _running;
+ }
+
+ private void Tick()
+ {
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+ continuation?.Invoke();
+ }
+
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(_callback, _callbackCompleted) ||
+ ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ Task.Run(continuation);
+ }
+ }
+
+ public void UnsafeOnCompleted(Action continuation)
+ {
+ OnCompleted(continuation);
+ }
+
+ public void Stop()
+ {
+ lock (_lockObj)
+ {
+ // Stop should be used to trigger the call to end the loop which disposes
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(GetType().FullName);
+ }
+
+ _running = false;
+ }
+
+ // Call tick here to make sure that we yield the callback,
+ // if it's currently waiting, we don't need to wait for the next period
+ Tick();
+ }
+
+ public void Dispose()
+ {
+ lock (_lockObj)
+ {
+ _disposed = true;
+
+ _timer?.Dispose();
+
+ _timer = null;
+ }
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs
index 1dfbe7fde9e..6eace9e71f7 100644
--- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs
+++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs
@@ -126,7 +126,11 @@ namespace System.Threading.RateLimiting
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
- oldestRequest.Tcs.TrySetResult(FailedLease);
+ if (!oldestRequest.Tcs.TrySetResult(FailedLease))
+ {
+ // Updating queue count is handled by the cancellation code
+ _queueCount += oldestRequest.Count;
+ }
}
while (_options.QueueLimit - _queueCount < tokenCount);
}
@@ -328,7 +332,7 @@ namespace System.Threading.RateLimiting
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
- next.Tcs.SetResult(FailedLease);
+ next.Tcs.TrySetResult(FailedLease);
}
}
}
diff --git a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs
index c0212c72f3c..60947795900 100644
--- a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs
+++ b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs
@@ -81,6 +81,12 @@ namespace System.Threading.RateLimiting.Test
public abstract Task CanCancelWaitAsyncBeforeQueuing();
[Fact]
+ public abstract Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest();
+
+ [Fact]
+ public abstract Task CanDisposeAfterCancelingQueuedRequest();
+
+ [Fact]
public abstract Task CancelUpdatesQueueLimit();
[Fact]
diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs
index 6d834452851..6131c17acab 100644
--- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs
+++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs
@@ -450,6 +450,55 @@ namespace System.Threading.RateLimiting.Test
}
[Fact]
+ public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
+ {
+ var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ // Add another item to queue, will be completed as failed later when we queue another item
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ var wait3 = limiter.WaitAsync(2);
+ Assert.False(wait3.IsCompleted);
+
+ // will be kicked by wait3 because we're using NewestFirst
+ var lease2 = await wait2;
+ Assert.False(lease2.IsAcquired);
+
+ lease.Dispose();
+
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDisposeAfterCancelingQueuedRequest()
+ {
+ var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
+ limiter.Dispose();
+ }
+
+ [Fact]
public override void NoMetadataOnAcquiredLease()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
diff --git a/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs
new file mode 100644
index 00000000000..2b3f3e99265
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/tests/FixedWindowRateLimiterTests.cs
@@ -0,0 +1,746 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Threading.RateLimiting.Test
+{
+ public class FixedWindowRateLimiterTests : BaseRateLimiterTests
+ {
+ [Fact]
+ public override void CanAcquireResource()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire();
+
+ Assert.True(lease.IsAcquired);
+ Assert.False(limiter.Acquire().IsAcquired);
+
+ lease.Dispose();
+ Assert.False(limiter.Acquire().IsAcquired);
+ Assert.True(limiter.TryReplenish());
+
+ Assert.True(limiter.Acquire().IsAcquired);
+ }
+
+ [Fact]
+ public override void InvalidOptionsThrows()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(
+ () => new FixedWindowRateLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(
+ () => new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1, TimeSpan.FromMinutes(2), autoReplenishment: false));
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ using var lease = await limiter.WaitAsync();
+
+ Assert.True(lease.IsAcquired);
+ var wait = limiter.WaitAsync();
+ Assert.False(wait.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+
+ Assert.True((await wait).IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = await limiter.WaitAsync();
+
+ Assert.True(lease.IsAcquired);
+ var wait1 = limiter.WaitAsync();
+ var wait2 = limiter.WaitAsync();
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait1;
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.Equal(0, limiter.GetAvailablePermits());
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3,
+ TimeSpan.FromMinutes(0), autoReplenishment: false));
+
+ var lease = await limiter.WaitAsync(2);
+ Assert.True(lease.IsAcquired);
+
+ var wait1 = limiter.WaitAsync(2);
+ var wait2 = limiter.WaitAsync();
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ // second queued item completes first with NewestFirst
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait1.IsCompleted);
+
+ lease.Dispose();
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait1;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ var wait = limiter.WaitAsync(1);
+
+ var failedLease = await limiter.WaitAsync(1);
+ Assert.False(failedLease.IsAcquired);
+ Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var timeSpan));
+ Assert.Equal(TimeSpan.Zero, timeSpan);
+ }
+
+ [Fact]
+ public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(1);
+ var lease1 = await wait;
+ Assert.False(lease1.IsAcquired);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait2.IsCompleted);
+
+ var wait3 = limiter.WaitAsync(2);
+ var lease1 = await wait;
+ var lease2 = await wait2;
+ Assert.False(lease1.IsAcquired);
+ Assert.False(lease2.IsAcquired);
+ Assert.False(wait3.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ // Fill queue
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var lease1 = await limiter.WaitAsync(2);
+ Assert.False(lease1.IsAcquired);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait = limiter.WaitAsync(1);
+
+ var failedLease = await limiter.WaitAsync(1);
+ Assert.False(failedLease.IsAcquired);
+
+ limiter.TryReplenish();
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+
+ wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(int.MaxValue);
+ Assert.True(lease.IsAcquired);
+
+ // Fill queue
+ var wait = limiter.WaitAsync(3);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(int.MaxValue);
+ Assert.False(wait2.IsCompleted);
+
+ var lease1 = await wait;
+ Assert.False(lease1.IsAcquired);
+
+ limiter.TryReplenish();
+ var lease2 = await wait2;
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override void ThrowsWhenAcquiringMoreThanLimit()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(() => limiter.Acquire(2));
+ }
+
+ [Fact]
+ public override async Task ThrowsWhenWaitingForMoreThanLimit()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await limiter.WaitAsync(2));
+ }
+
+ [Fact]
+ public override void ThrowsWhenAcquiringLessThanZero()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(() => limiter.Acquire(-1));
+ }
+
+ [Fact]
+ public override async Task ThrowsWhenWaitingForLessThanZero()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await limiter.WaitAsync(-1));
+ }
+
+ [Fact]
+ public override void AcquireZero_WithAvailability()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ using var lease = limiter.Acquire(0);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override void AcquireZero_WithoutAvailability()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var lease2 = limiter.Acquire(0);
+ Assert.False(lease2.IsAcquired);
+ lease2.Dispose();
+ }
+
+ [Fact]
+ public override async Task WaitAsyncZero_WithAvailability()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ using var lease = await limiter.WaitAsync(0);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(0);
+ Assert.False(wait.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+ using var lease2 = await wait;
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDequeueMultipleResourcesAtOnce()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+ using var lease = await limiter.WaitAsync(2);
+ Assert.True(lease.IsAcquired);
+
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ var lease1 = await wait1;
+ var lease2 = await wait2;
+ Assert.True(lease1.IsAcquired);
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanCancelWaitAsyncAfterQueuing()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override async Task CanCancelWaitAsyncBeforeQueuing()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => limiter.WaitAsync(1, cts.Token).AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override async Task CancelUpdatesQueueLimit()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override void NoMetadataOnAcquiredLease()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.False(lease.TryGetMetadata(MetadataName.RetryAfter, out _));
+ }
+
+ [Fact]
+ public override void MetadataNamesContainsAllMetadata()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name));
+ }
+
+ [Fact]
+ public override async Task DisposeReleasesQueuedAcquires()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ var wait3 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+ Assert.False(wait3.IsCompleted);
+
+ limiter.Dispose();
+
+ lease = await wait1;
+ Assert.False(lease.IsAcquired);
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+ lease = await wait3;
+ Assert.False(lease.IsAcquired);
+
+ // Throws after disposal
+ Assert.Throws<ObjectDisposedException>(() => limiter.Acquire(1));
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => limiter.WaitAsync(1).AsTask());
+ }
+
+ [Fact]
+ public override async Task DisposeAsyncReleasesQueuedAcquires()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ var wait3 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+ Assert.False(wait3.IsCompleted);
+
+ await limiter.DisposeAsync();
+
+ lease = await wait1;
+ Assert.False(lease.IsAcquired);
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+ lease = await wait3;
+ Assert.False(lease.IsAcquired);
+
+ // Throws after disposal
+ Assert.Throws<ObjectDisposedException>(() => limiter.Acquire(1));
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => limiter.WaitAsync(1).AsTask());
+ }
+
+ [Fact]
+ public async Task RetryMetadataOnFailedWaitAsync()
+ {
+ var options = new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromSeconds(20), autoReplenishment: false);
+ var limiter = new FixedWindowRateLimiter(options);
+
+ using var lease = limiter.Acquire(2);
+
+ var failedLease = await limiter.WaitAsync(2);
+ Assert.False(failedLease.IsAcquired);
+ Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter.Name, out var metadata));
+ var metaDataTime = Assert.IsType<TimeSpan>(metadata);
+ Assert.Equal(options.Window.Ticks, metaDataTime.Ticks);
+
+ Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata));
+ Assert.Equal(options.Window.Ticks, typedMetadata.Ticks);
+ Assert.Collection(failedLease.MetadataNames, item => item.Equals(MetadataName.RetryAfter.Name));
+ }
+
+ [Fact]
+ public async Task CorrectRetryMetadataWithQueuedItem()
+ {
+ var options = new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromSeconds(20), autoReplenishment: false);
+ var limiter = new FixedWindowRateLimiter(options);
+
+ using var lease = limiter.Acquire(2);
+ // Queue item which changes the retry after time for failed items
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var failedLease = await limiter.WaitAsync(2);
+ Assert.False(failedLease.IsAcquired);
+ Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata));
+ Assert.Equal(options.Window.Ticks, typedMetadata.Ticks);
+ }
+
+
+ [Fact]
+ public async Task CorrectRetryMetadataWithNonZeroAvailableItems()
+ {
+ var options = new FixedWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromSeconds(20), autoReplenishment: false);
+ var limiter = new FixedWindowRateLimiter(options);
+
+ using var lease = limiter.Acquire(2);
+
+ var failedLease = await limiter.WaitAsync(3);
+ Assert.False(failedLease.IsAcquired);
+ Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata));
+ Assert.Equal(options.Window.Ticks, typedMetadata.Ticks);
+ }
+
+ [Fact]
+ public void TryReplenishWithAutoReplenish_ReturnsFalse()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromSeconds(1), autoReplenishment: true));
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ Assert.False(limiter.TryReplenish());
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public async Task AutoReplenish_ReplenishesCounters()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromMilliseconds(1000), autoReplenishment: true));
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ limiter.Acquire(2);
+
+ var lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ lease = limiter.Acquire(1);
+ Assert.False(lease.IsAcquired);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override void NullIdleDurationWhenActive()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.FromMilliseconds(2), autoReplenishment: false));
+ limiter.Acquire(1);
+ Assert.Null(limiter.IdleDuration);
+ }
+
+ [Fact]
+ public override async Task IdleDurationUpdatesWhenIdle()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.FromMilliseconds(2), autoReplenishment: false));
+ Assert.NotNull(limiter.IdleDuration);
+ var previousDuration = limiter.IdleDuration;
+ await Task.Delay(15);
+ Assert.True(previousDuration < limiter.IdleDuration);
+ }
+
+ [Fact]
+ public override void IdleDurationUpdatesWhenChangingFromActive()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+ limiter.Acquire(1);
+ limiter.TryReplenish();
+ Assert.NotNull(limiter.IdleDuration);
+ }
+
+ [Fact]
+ public void ReplenishingRateLimiterPropertiesHaveCorrectValues()
+ {
+ var replenishPeriod = TimeSpan.FromMinutes(1);
+ using ReplenishingRateLimiter limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ replenishPeriod, autoReplenishment: true));
+ Assert.True(limiter.IsAutoReplenishing);
+ Assert.Equal(replenishPeriod, limiter.ReplenishmentPeriod);
+
+ replenishPeriod = TimeSpan.FromSeconds(2);
+ using ReplenishingRateLimiter limiter2 = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ replenishPeriod, autoReplenishment: false));
+ Assert.False(limiter2.IsAutoReplenishing);
+ Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod);
+ }
+
+ [Fact]
+ public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ // Add another item to queue, will be completed as failed later when we queue another item
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+
+ var wait3 = limiter.WaitAsync(2);
+ Assert.False(wait3.IsCompleted);
+
+ // will be kicked by wait3 because we're using NewestFirst
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+
+ limiter.TryReplenish();
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDisposeAfterCancelingQueuedRequest()
+ {
+ var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
+ limiter.Dispose();
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs
index a47c078d96e..47ccdc5c376 100644
--- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs
+++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs
@@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Xunit;
@@ -30,11 +32,473 @@ namespace System.Threading.RateLimiting.Tests
async () => await limiter.WaitAsync(string.Empty, 1, new CancellationToken(true)));
}
- internal class NotImplementedPartitionedRateLimiter<T> : PartitionedRateLimiter<T>
+ // Create
+
+ [Fact]
+ public void Create_AcquireCallsUnderlyingPartitionsLimiter()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Acquire("");
+ Assert.Equal(1, limiterFactory.Limiters.Count);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ }
+
+ [Fact]
+ public async Task Create_WaitAsyncCallsUnderlyingPartitionsLimiter()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ await limiter.WaitAsync("");
+ Assert.Equal(1, limiterFactory.Limiters.Count);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount);
+ }
+
+ [Fact]
+ public void Create_GetAvailablePermitsCallsUnderlyingPartitionsLimiter()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.GetAvailablePermits("");
+ Assert.Equal(1, limiterFactory.Limiters.Count);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.GetAvailablePermitsCallCount);
+ }
+
+ [Fact]
+ public async Task Create_PartitionIsCached()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Acquire("");
+ await limiter.WaitAsync("");
+ limiter.Acquire("");
+ await limiter.WaitAsync("");
+ Assert.Equal(1, limiterFactory.Limiters.Count);
+ Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(2, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount);
+ }
+
+ [Fact]
+ public void Create_MultiplePartitionsWork()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ }
+ else
+ {
+ return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key));
+ }
+ });
+
+ limiter.Acquire("1");
+ limiter.Acquire("2");
+ limiter.Acquire("1");
+ limiter.Acquire("2");
+
+ Assert.Equal(2, limiterFactory.Limiters.Count);
+
+ Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[0].Key);
+
+ Assert.Equal(2, limiterFactory.Limiters[1].Limiter.AcquireCallCount);
+ Assert.Equal(2, limiterFactory.Limiters[1].Key);
+ }
+
+ [Fact]
+ public async Task Create_BlockingWaitDoesNotBlockOtherPartitions()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ }
+ return RateLimitPartition.CreateConcurrencyLimiter(2,
+ _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2));
+ });
+
+ var lease = await limiter.WaitAsync("2");
+ var wait = limiter.WaitAsync("2");
+ Assert.False(wait.IsCompleted);
+
+ // Different partition, should not be blocked by the wait in the other partition
+ await limiter.WaitAsync("1");
+
+ lease.Dispose();
+ await wait;
+
+ Assert.Equal(1, limiterFactory.Limiters.Count);
+ Assert.Equal(0, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount);
+ }
+
+ // Uses Task.Wait in a Task.Run to purposefully test a blocking scenario, this doesn't work on WASM currently
+ [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ public async Task Create_BlockingFactoryDoesNotBlockOtherPartitions()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
+ var startedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key =>
+ {
+ startedTcs.SetResult(null);
+ // block the factory method
+ Assert.True(tcs.Task.Wait(TimeSpan.FromSeconds(10)));
+ return limiterFactory.GetLimiter(key);
+ });
+ }
+ return RateLimitPartition.Create(2,
+ key => limiterFactory.GetLimiter(key));
+ });
+
+ var lease = await limiter.WaitAsync("2");
+
+ var blockedTask = Task.Run(async () =>
+ {
+ await limiter.WaitAsync("1");
+ });
+ await startedTcs.Task;
+
+ // Other partitions aren't blocked
+ await limiter.WaitAsync("2");
+
+ // Try to acquire from the blocking limiter, this should wait until the blocking limiter has been resolved and not create a new one
+ var blockedTask2 = Task.Run(async () =>
+ {
+ await limiter.WaitAsync("1");
+ });
+
+ // unblock limiter factory
+ tcs.SetResult(null);
+ await blockedTask;
+ await blockedTask2;
+
+ // Only 2 limiters should have been created
+ Assert.Equal(2, limiterFactory.Limiters.Count);
+ Assert.Equal(2, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount);
+ Assert.Equal(2, limiterFactory.Limiters[1].Limiter.WaitAsyncCallCount);
+ }
+
+ [Fact]
+ public void Create_PassedInEqualityComparerIsUsed()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ var equality = new TestEquality();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ }
+ return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key));
+ }, equality);
+
+ limiter.Acquire("1");
+ // GetHashCode to add item to dictionary (skips TryGet for empty dictionary)
+ Assert.Equal(0, equality.EqualsCallCount);
+ Assert.Equal(1, equality.GetHashCodeCallCount);
+ limiter.Acquire("1");
+ // GetHashCode and Equal from TryGet to see if item is in dictionary
+ Assert.Equal(1, equality.EqualsCallCount);
+ Assert.Equal(2, equality.GetHashCodeCallCount);
+ limiter.Acquire("2");
+ // GetHashCode from TryGet (fails check) and second GetHashCode to add item to dictionary
+ Assert.Equal(1, equality.EqualsCallCount);
+ Assert.Equal(4, equality.GetHashCodeCallCount);
+
+ Assert.Equal(2, limiterFactory.Limiters.Count);
+ Assert.Equal(2, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount);
+ }
+
+ [Fact]
+ public void Create_DisposeWithoutLimitersNoops()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Dispose();
+
+ Assert.Equal(0, limiterFactory.Limiters.Count);
+ }
+
+ [Fact]
+ public void Create_DisposeDisposesAllLimiters()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ }
+ return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Acquire("1");
+ limiter.Acquire("2");
+
+ limiter.Dispose();
+
+ Assert.Equal(2, limiterFactory.Limiters.Count);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeCallCount);
+
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeCallCount);
+ }
+
+ [Fact]
+ public void Create_DisposeThrowsForFutureMethodCalls()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Dispose();
+
+ Assert.Throws<ObjectDisposedException>(() => limiter.Acquire("1"));
+
+ Assert.Equal(0, limiterFactory.Limiters.Count);
+ }
+
+ [Fact]
+ public async Task Create_DisposeAsyncDisposesAllLimiters()
+ {
+ var limiterFactory = new TrackingRateLimiterFactory<int>();
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key));
+ }
+ return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key));
+ });
+
+ limiter.Acquire("1");
+ limiter.Acquire("2");
+
+ await limiter.DisposeAsync();
+
+ Assert.Equal(2, limiterFactory.Limiters.Count);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[0].Limiter.DisposeAsyncCallCount);
+
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeCallCount);
+ Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeAsyncCallCount);
+ }
+
+ [Fact]
+ public async Task Create_WithTokenBucketReplenishesAutomatically()
+ {
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.CreateTokenBucketLimiter(1,
+ _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false));
+ });
+
+ var lease = limiter.Acquire("");
+ Assert.True(lease.IsAcquired);
+
+ lease = await limiter.WaitAsync("");
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public async Task Create_WithReplenishingLimiterReplenishesAutomatically()
+ {
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ // Use the non-specific Create method to make sure ReplenishingRateLimiters are still handled properly
+ return RateLimitPartition.Create(1,
+ _ => new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false)));
+ });
+
+ var lease = limiter.Acquire("");
+ Assert.True(lease.IsAcquired);
+
+ lease = await limiter.WaitAsync("");
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public async Task Create_MultipleReplenishingLimitersReplenishAutomatically()
+ {
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ if (resource == "1")
+ {
+ return RateLimitPartition.CreateTokenBucketLimiter(1,
+ _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false));
+ }
+ return RateLimitPartition.CreateTokenBucketLimiter(2,
+ _ => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMilliseconds(100), 1, false));
+ });
+
+ var lease = limiter.Acquire("1");
+ Assert.True(lease.IsAcquired);
+
+ lease = await limiter.WaitAsync("1");
+ Assert.True(lease.IsAcquired);
+
+ // Creates the second Replenishing limiter
+ // Indirectly tests that the cached list of limiters used by the timer is probably updated by making sure a limiter already made use of it before we create a second replenishing one
+ lease = limiter.Acquire("2");
+ Assert.True(lease.IsAcquired);
+
+ lease = await limiter.WaitAsync("1");
+ Assert.True(lease.IsAcquired);
+ lease = await limiter.WaitAsync("2");
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public async Task Create_CancellationTokenPassedToUnderlyingLimiter()
+ {
+ using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
+ {
+ return RateLimitPartition.CreateConcurrencyLimiter(1,
+ _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
+ });
+
+ var lease = limiter.Acquire("");
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var waitTask = limiter.WaitAsync("", 1, cts.Token);
+ Assert.False(waitTask.IsCompleted);
+ cts.Cancel();
+ await Assert.ThrowsAsync<TaskCanceledException>(async () => await waitTask);
+ }
+
+ internal sealed class NotImplementedPartitionedRateLimiter<T> : PartitionedRateLimiter<T>
{
public override int GetAvailablePermits(T resourceID) => throw new NotImplementedException();
protected override RateLimitLease AcquireCore(T resourceID, int permitCount) => throw new NotImplementedException();
protected override ValueTask<RateLimitLease> WaitAsyncCore(T resourceID, int permitCount, CancellationToken cancellationToken) => throw new NotImplementedException();
}
+
+ internal sealed class TrackingRateLimiter : RateLimiter
+ {
+ private int _getAvailablePermitsCallCount;
+ private int _acquireCallCount;
+ private int _waitAsyncCallCount;
+ private int _disposeCallCount;
+ private int _disposeAsyncCallCount;
+
+ public int GetAvailablePermitsCallCount => _getAvailablePermitsCallCount;
+ public int AcquireCallCount => _acquireCallCount;
+ public int WaitAsyncCallCount => _waitAsyncCallCount;
+ public int DisposeCallCount => _disposeCallCount;
+ public int DisposeAsyncCallCount => _disposeAsyncCallCount;
+
+ public override TimeSpan? IdleDuration => throw new NotImplementedException();
+
+ public override int GetAvailablePermits()
+ {
+ Interlocked.Increment(ref _getAvailablePermitsCallCount);
+ return 1;
+ }
+
+ protected override RateLimitLease AcquireCore(int permitCount)
+ {
+ Interlocked.Increment(ref _acquireCallCount);
+ return new Lease();
+ }
+
+ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken)
+ {
+ Interlocked.Increment(ref _waitAsyncCallCount);
+ return new ValueTask<RateLimitLease>(new Lease());
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ Interlocked.Increment(ref _disposeCallCount);
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ Interlocked.Increment(ref _disposeAsyncCallCount);
+ return new ValueTask();
+ }
+
+ private sealed class Lease : RateLimitLease
+ {
+ public override bool IsAcquired => throw new NotImplementedException();
+
+ public override IEnumerable<string> MetadataNames => throw new NotImplementedException();
+
+ public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException();
+ }
+ }
+
+ internal sealed class TrackingRateLimiterFactory<TKey>
+ {
+ public List<(TKey Key, TrackingRateLimiter Limiter)> Limiters { get; } = new();
+
+ public RateLimiter GetLimiter(TKey key)
+ {
+ TrackingRateLimiter limiter;
+ lock (Limiters)
+ {
+ limiter = new TrackingRateLimiter();
+ Limiters.Add((key, limiter));
+ }
+ return limiter;
+ }
+ }
+
+ internal sealed class TestEquality : IEqualityComparer<int>
+ {
+ private int _equalsCallCount;
+ private int _getHashCodeCallCount;
+
+ public int EqualsCallCount => _equalsCallCount;
+ public int GetHashCodeCallCount => _getHashCodeCallCount;
+
+ public bool Equals(int x, int y)
+ {
+ Interlocked.Increment(ref _equalsCallCount);
+ return x == y;
+ }
+ public int GetHashCode([DisallowNull] int obj)
+ {
+ Interlocked.Increment(ref _getHashCodeCallCount);
+ return obj.GetHashCode();
+ }
+ }
}
}
diff --git a/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs b/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs
new file mode 100644
index 00000000000..7e2bfe17285
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/tests/RateLimiterPartitionTests.cs
@@ -0,0 +1,83 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Threading.RateLimiting.Tests
+{
+ public class RateLimiterPartitionTests
+ {
+ [Fact]
+ public void Create_Concurrency()
+ {
+ var options = new ConcurrencyLimiterOptions(10, QueueProcessingOrder.OldestFirst, 10);
+ var partition = RateLimitPartition.CreateConcurrencyLimiter(1, key => options);
+
+ var factoryProperty = typeof(RateLimitPartition<int>).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!;
+ var factory = (Func<int, RateLimiter>)factoryProperty.GetValue(partition);
+ var limiter = factory(1);
+ var concurrencyLimiter = Assert.IsType<ConcurrencyLimiter>(limiter);
+ Assert.Equal(options.PermitLimit, concurrencyLimiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public void Create_TokenBucket()
+ {
+ var options = new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 10, TimeSpan.FromMinutes(1), 1, true);
+ var partition = RateLimitPartition.CreateTokenBucketLimiter(1, key => options);
+
+ var factoryProperty = typeof(RateLimitPartition<int>).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!;
+ var factory = (Func<int, RateLimiter>)factoryProperty.GetValue(partition);
+ var limiter = factory(1);
+ var tokenBucketLimiter = Assert.IsType<TokenBucketRateLimiter>(limiter);
+ Assert.Equal(options.TokenLimit, tokenBucketLimiter.GetAvailablePermits());
+ // TODO: Check other properties when ReplenshingRateLimiter is merged
+ // TODO: Check that autoReplenishment: true got changed to false
+ }
+
+ [Fact]
+ public async Task Create_NoLimiter()
+ {
+ var partition = RateLimitPartition.CreateNoLimiter(1);
+
+ var factoryProperty = typeof(RateLimitPartition<int>).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!;
+ var factory = (Func<int, RateLimiter>)factoryProperty.GetValue(partition);
+ var limiter = factory(1);
+
+ // How do we test an internal implementation of a limiter that doesn't limit? Just try some stuff that normal limiters would probably block on and see if it works.
+ var available = limiter.GetAvailablePermits();
+ var lease = limiter.Acquire(int.MaxValue);
+ Assert.True(lease.IsAcquired);
+ Assert.Equal(available, limiter.GetAvailablePermits());
+
+ lease = limiter.Acquire(int.MaxValue);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(int.MaxValue);
+ Assert.True(wait.IsCompletedSuccessfully);
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+
+ lease.Dispose();
+ }
+
+ [Fact]
+ public void Create_AnyLimiter()
+ {
+ var partition = RateLimitPartition.Create(1, key => new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 10)));
+
+ var factoryProperty = typeof(RateLimitPartition<int>).GetField("Factory", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!;
+ var factory = (Func<int, RateLimiter>)factoryProperty.GetValue(partition);
+ var limiter = factory(1);
+ var concurrencyLimiter = Assert.IsType<ConcurrencyLimiter>(limiter);
+ Assert.Equal(1, concurrencyLimiter.GetAvailablePermits());
+
+ var partition2 = RateLimitPartition.Create(1, key => new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 10, TimeSpan.FromMilliseconds(100), 1, autoReplenishment: false)));
+ factory = (Func<int, RateLimiter>)factoryProperty.GetValue(partition2);
+ limiter = factory(1);
+ var tokenBucketLimiter = Assert.IsType<TokenBucketRateLimiter>(limiter);
+ Assert.Equal(1, tokenBucketLimiter.GetAvailablePermits());
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs
new file mode 100644
index 00000000000..46cf35c5a85
--- /dev/null
+++ b/src/libraries/System.Threading.RateLimiting/tests/SlidingWindowRateLimiterTests.cs
@@ -0,0 +1,770 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Threading.RateLimiting.Test
+{
+ public class SlidingWindowRateLimiterTests : BaseRateLimiterTests
+ {
+ [Fact]
+ public override void CanAcquireResource()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire();
+
+ Assert.True(lease.IsAcquired);
+ Assert.False(limiter.Acquire().IsAcquired);
+
+ lease.Dispose();
+ Assert.False(limiter.Acquire().IsAcquired);
+ Assert.True(limiter.TryReplenish());
+ Assert.True(limiter.TryReplenish());
+
+ Assert.True(limiter.Acquire().IsAcquired);
+ }
+
+ [Fact]
+ public override void InvalidOptionsThrows()
+ {
+ Assert.Throws<ArgumentOutOfRangeException>(
+ () => new SlidingWindowRateLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(
+ () => new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(
+ () => new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), -1, autoReplenishment: false));
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 4,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+
+ using var lease = await limiter.WaitAsync();
+
+ Assert.True(lease.IsAcquired);
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(2);
+ Assert.False(wait2.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+
+ Assert.True((await wait2).IsAcquired);
+ }
+
+ [Fact]
+ public async Task CanAcquireMultipleRequestsAsync()
+ {
+ // This test verifies the following behavior
+ // 1. when we have available permits after replenish to serve the queued requests
+ // 2. when the oldest item from queue is remove to accomodate new requests (QueueProcessingOrder: NewestFirst)
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(4, QueueProcessingOrder.NewestFirst, 4,
+ TimeSpan.Zero, 3, autoReplenishment: false));
+
+ using var lease = await limiter.WaitAsync(2);
+
+ Assert.True(lease.IsAcquired);
+ var wait = limiter.WaitAsync(3);
+ Assert.False(wait.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(2);
+ Assert.True(wait2.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+
+ var wait3 = limiter.WaitAsync(2);
+ Assert.False(wait3.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+ Assert.True((await wait3).IsAcquired);
+
+ Assert.False((await wait).IsAcquired);
+ Assert.Equal(0, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.FromMinutes(0), 2, autoReplenishment: false));
+ var lease = await limiter.WaitAsync(2);
+
+ Assert.True(lease.IsAcquired);
+ var wait1 = limiter.WaitAsync();
+ var wait2 = limiter.WaitAsync(2);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.False(wait1.IsCompleted);
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait1;
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ Assert.True(limiter.TryReplenish());
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3,
+ TimeSpan.FromMinutes(0), 2, autoReplenishment: false));
+
+ var lease = await limiter.WaitAsync(2);
+ Assert.True(lease.IsAcquired);
+
+ var wait1 = limiter.WaitAsync(2);
+ var wait2 = limiter.WaitAsync();
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+ Assert.False(wait2.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+ // second queued item completes first with NewestFirst
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait1.IsCompleted);
+
+ lease.Dispose();
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ Assert.True(limiter.TryReplenish());
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait1;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ var wait = limiter.WaitAsync(1);
+
+ var failedLease = await limiter.WaitAsync(1);
+ Assert.False(failedLease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(1);
+ var lease1 = await wait;
+ Assert.False(lease1.IsAcquired);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait2.IsCompleted);
+
+ var wait3 = limiter.WaitAsync(2);
+ var lease1 = await wait;
+ var lease2 = await wait2;
+ Assert.False(lease1.IsAcquired);
+ Assert.False(lease2.IsAcquired);
+ Assert.False(wait3.IsCompleted);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ // Fill queue
+ var wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ var lease1 = await limiter.WaitAsync(2);
+ Assert.False(lease1.IsAcquired);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.Zero, 3, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ var wait = limiter.WaitAsync(2);
+
+ var failedLease = await limiter.WaitAsync(2);
+ Assert.False(failedLease.IsAcquired);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+
+ wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(int.MaxValue);
+ Assert.True(lease.IsAcquired);
+
+ // Fill queue
+ var wait = limiter.WaitAsync(3);
+ Assert.False(wait.IsCompleted);
+
+ var wait2 = limiter.WaitAsync(int.MaxValue);
+ Assert.False(wait2.IsCompleted);
+
+ var lease1 = await wait;
+ Assert.False(lease1.IsAcquired);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+ var lease2 = await wait2;
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override void ThrowsWhenAcquiringMoreThanLimit()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(() => limiter.Acquire(2));
+ }
+
+ [Fact]
+ public override async Task ThrowsWhenWaitingForMoreThanLimit()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await limiter.WaitAsync(2));
+ }
+
+ [Fact]
+ public override void ThrowsWhenAcquiringLessThanZero()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ Assert.Throws<ArgumentOutOfRangeException>(() => limiter.Acquire(-1));
+ }
+
+ [Fact]
+ public override async Task ThrowsWhenWaitingForLessThanZero()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await limiter.WaitAsync(-1));
+ }
+
+ [Fact]
+ public override void AcquireZero_WithAvailability()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+
+ using var lease = limiter.Acquire(0);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override void AcquireZero_WithoutAvailability()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var lease2 = limiter.Acquire(0);
+ Assert.False(lease2.IsAcquired);
+ lease2.Dispose();
+ }
+
+ [Fact]
+ public override async Task WaitAsyncZero_WithAvailability()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+
+ using var lease = await limiter.WaitAsync(0);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(0);
+ Assert.False(wait.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+ Assert.True(limiter.TryReplenish());
+ using var lease2 = await wait;
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDequeueMultipleResourcesAtOnce()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 4,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ using var lease = await limiter.WaitAsync(2);
+ Assert.True(lease.IsAcquired);
+
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+ Assert.True(limiter.TryReplenish());
+
+ var lease1 = await wait1;
+ var lease2 = await wait2;
+ Assert.True(lease1.IsAcquired);
+ Assert.True(lease2.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanCancelWaitAsyncAfterQueuing()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.Equal(0, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override async Task CanCancelWaitAsyncBeforeQueuing()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => limiter.WaitAsync(1, cts.Token).AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.Equal(0, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override async Task CancelUpdatesQueueLimit()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ wait = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public override void NoMetadataOnAcquiredLease()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.False(lease.TryGetMetadata(MetadataName.RetryAfter, out _));
+ }
+
+ [Fact]
+ public override void MetadataNamesContainsAllMetadata()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ using var lease = limiter.Acquire(1);
+ Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name));
+ }
+
+ [Fact]
+ public override async Task DisposeReleasesQueuedAcquires()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ var wait3 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+ Assert.False(wait3.IsCompleted);
+
+ limiter.Dispose();
+
+ lease = await wait1;
+ Assert.False(lease.IsAcquired);
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+ lease = await wait3;
+ Assert.False(lease.IsAcquired);
+
+ // Throws after disposal
+ Assert.Throws<ObjectDisposedException>(() => limiter.Acquire(1));
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => limiter.WaitAsync(1).AsTask());
+ }
+
+ [Fact]
+ public override async Task DisposeAsyncReleasesQueuedAcquires()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ var wait1 = limiter.WaitAsync(1);
+ var wait2 = limiter.WaitAsync(1);
+ var wait3 = limiter.WaitAsync(1);
+ Assert.False(wait1.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+ Assert.False(wait3.IsCompleted);
+
+ await limiter.DisposeAsync();
+
+ lease = await wait1;
+ Assert.False(lease.IsAcquired);
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+ lease = await wait3;
+ Assert.False(lease.IsAcquired);
+
+ // Throws after disposal
+ Assert.Throws<ObjectDisposedException>(() => limiter.Acquire(1));
+ await Assert.ThrowsAsync<ObjectDisposedException>(() => limiter.WaitAsync(1).AsTask());
+ }
+
+ [Fact]
+ public void TryReplenishWithAutoReplenish_ReturnsFalse()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromSeconds(1), 1, autoReplenishment: true));
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ Assert.False(limiter.TryReplenish());
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ }
+
+ [Fact]
+ public async Task AutoReplenish_ReplenishesCounters()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.FromMilliseconds(1000), 2, autoReplenishment: true));
+ Assert.Equal(2, limiter.GetAvailablePermits());
+ limiter.Acquire(2);
+
+ var lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, 3, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ Assert.Equal(1, limiter.GetAvailablePermits());
+ lease = await limiter.WaitAsync(1);
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ Assert.True(limiter.TryReplenish());
+
+ Assert.False(wait.IsCompleted);
+
+ Assert.True(limiter.TryReplenish());
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 5,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+
+ var lease = limiter.Acquire(3);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ var wait2 = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+
+ Assert.False(wait.IsCompleted);
+ Assert.False(wait2.IsCompleted);
+
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait2;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+ Assert.False(wait.IsCompleted);
+
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var wait = limiter.WaitAsync(2);
+ Assert.False(wait.IsCompleted);
+
+ lease = limiter.Acquire(1);
+ Assert.False(lease.IsAcquired);
+
+ limiter.TryReplenish();
+ Assert.True(limiter.TryReplenish());
+
+ lease = await wait;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override void NullIdleDurationWhenActive()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.FromMilliseconds(2), 1, autoReplenishment: false));
+ limiter.Acquire(1);
+ Assert.Null(limiter.IdleDuration);
+ }
+
+ [Fact]
+ public override async Task IdleDurationUpdatesWhenIdle()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.FromMilliseconds(2), 2, autoReplenishment: false));
+ Assert.NotNull(limiter.IdleDuration);
+ var previousDuration = limiter.IdleDuration;
+ await Task.Delay(15);
+ Assert.True(previousDuration < limiter.IdleDuration);
+ }
+
+ [Fact]
+ public override void IdleDurationUpdatesWhenChangingFromActive()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ limiter.Acquire(1);
+ limiter.TryReplenish();
+ limiter.TryReplenish();
+ Assert.NotNull(limiter.IdleDuration);
+ }
+
+ [Fact]
+ public void ReplenishingRateLimiterPropertiesHaveCorrectValues()
+ {
+ var replenishPeriod = TimeSpan.FromMinutes(1);
+ using ReplenishingRateLimiter limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ replenishPeriod, 1, autoReplenishment: true));
+ Assert.True(limiter.IsAutoReplenishing);
+ Assert.Equal(replenishPeriod, limiter.ReplenishmentPeriod);
+
+ replenishPeriod = TimeSpan.FromSeconds(2);
+ using ReplenishingRateLimiter limiter2 = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2,
+ replenishPeriod, 1, autoReplenishment: false));
+ Assert.False(limiter2.IsAutoReplenishing);
+ Assert.Equal(replenishPeriod, limiter2.ReplenishmentPeriod);
+ }
+
+ [Fact]
+ public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ // Add another item to queue, will be completed as failed later when we queue another item
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+ limiter.TryReplenish();
+
+ var wait3 = limiter.WaitAsync(2);
+ Assert.False(wait3.IsCompleted);
+
+ // will be kicked by wait3 because we're using NewestFirst
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+
+ limiter.TryReplenish();
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDisposeAfterCancelingQueuedRequest()
+ {
+ var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
+ limiter.Dispose();
+ }
+ }
+}
diff --git a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj
index 4bb1f2a3392..ca8bf22f743 100644
--- a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj
+++ b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj
@@ -6,9 +6,15 @@
<Compile Include="BaseRateLimiterTests.cs" />
<Compile Include="ConcurrencyLimiterTests.cs" />
<Compile Include="PartitionedRateLimiterTests.cs" />
+ <Compile Include="FixedWindowRateLimiterTests.cs" />
+ <Compile Include="RateLimiterPartitionTests.cs" />
+ <Compile Include="SlidingWindowRateLimiterTests.cs" />
<Compile Include="TokenBucketRateLimiterTests.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\src\System.Threading.RateLimiting.csproj" />
</ItemGroup>
+ <ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">
+ <PackageReference Include="System.ValueTuple" Version="$(SystemValueTupleVersion)" />
+ </ItemGroup>
</Project>
diff --git a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs
index f0bcea61a6d..1fb831ab4d5 100644
--- a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs
+++ b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs
@@ -367,6 +367,58 @@ namespace System.Threading.RateLimiting.Test
}
[Fact]
+ public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
+ {
+ var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+ TimeSpan.Zero, 2, autoReplenishment: false));
+ var lease = limiter.Acquire(2);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ // Add another item to queue, will be completed as failed later when we queue another item
+ var wait2 = limiter.WaitAsync(1);
+ Assert.False(wait.IsCompleted);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ lease.Dispose();
+
+ var wait3 = limiter.WaitAsync(2);
+ Assert.False(wait3.IsCompleted);
+
+ // will be kicked by wait3 because we're using NewestFirst
+ lease = await wait2;
+ Assert.False(lease.IsAcquired);
+
+ limiter.TryReplenish();
+ lease = await wait3;
+ Assert.True(lease.IsAcquired);
+ }
+
+ [Fact]
+ public override async Task CanDisposeAfterCancelingQueuedRequest()
+ {
+ var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
+ TimeSpan.Zero, 1, autoReplenishment: false));
+ var lease = limiter.Acquire(1);
+ Assert.True(lease.IsAcquired);
+
+ var cts = new CancellationTokenSource();
+ var wait = limiter.WaitAsync(1, cts.Token);
+
+ cts.Cancel();
+ var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
+ Assert.Equal(cts.Token, ex.CancellationToken);
+
+ // Make sure dispose doesn't have any side-effects when dealing with a canceled queued item
+ limiter.Dispose();
+ }
+
+ [Fact]
public override async Task CanCancelWaitAsyncBeforeQueuing()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,