diff options
author | Brennan <brecon@microsoft.com> | 2022-01-29 03:58:36 +0300 |
---|---|---|
committer | Brennan <brecon@microsoft.com> | 2022-01-29 03:58:36 +0300 |
commit | 865348babe3facf5ee548a48751b34426fc9559d (patch) | |
tree | 0c2511888b7733ebe64a6f8d659bcb2331be0b8e | |
parent | 3cf20f9ef363cfe833212cfd0f5af61881ca4914 (diff) |
WIP any keybrecon/RateLimitPlayground
-rw-r--r-- | src/Http/samples/MinimalSample/Program.cs | 145 |
1 files changed, 89 insertions, 56 deletions
diff --git a/src/Http/samples/MinimalSample/Program.cs b/src/Http/samples/MinimalSample/Program.cs index c6082df183..190b3419b4 100644 --- a/src/Http/samples/MinimalSample/Program.cs +++ b/src/Http/samples/MinimalSample/Program.cs @@ -15,15 +15,15 @@ builder.Services.AddHttpClient("RateLimited", o => o.BaseAddress = new Uri("http new RateLimitedHandler( - new AggregateRateLimitBuilder<HttpRequestMessage, string>() + new AggregateRateLimitBuilder<HttpRequestMessage>() - .WithTokenBucketPolicy(request => request.Method.Equals(HttpMethod.Post) ? HttpMethod.Post.Method : null, - new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 10, TimeSpan.FromSeconds(1), 1, true)) + .WithConcurrencyPolicy(request => request.Method.Equals(HttpMethod.Post) ? HttpMethod.Post : null, + new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 10)) .WithPolicy(request => request.Headers.TryGetValues("cookie", out _) ? "cookie" : null, _ => new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1))) - .WithConcurrencyPolicy(request => request.RequestUri.AbsolutePath.StartsWith("/problem", StringComparison.InvariantCultureIgnoreCase) ? request.RequestUri.AbsolutePath : null, + .WithConcurrencyPolicy(request => request.RequestUri.AbsolutePath.StartsWith("/problem", StringComparison.InvariantCultureIgnoreCase) ? request.RequestUri : null, new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 2)) .Build())); @@ -192,28 +192,58 @@ namespace System.Threading.RateLimiting namespace System.Threading.RateLimiting { - public class AggregateRateLimitBuilder<TResource, TKey> where TKey : notnull + public class AggregateRateLimitBuilder<TResource> { - private List<(Func<TResource, TKey?>, Func<TKey, RateLimiter>)> _policies = new(); + private List<Func<TResource, RateLimiter?>> _policies = new(); private TimeSpan _minRefreshInterval = TimeSpan.MaxValue; private RateLimiter _defaultRateLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); - public AggregateRateLimitBuilder<TResource, TKey> WithPolicy(Func<TResource, TKey?> keyFactory, Func<TKey, RateLimiter> limiterFactory) + public AggregateRateLimitBuilder<TResource> WithPolicy<TKey>(Func<TResource, TKey?> keyFactory, Func<TKey, RateLimiter> limiterFactory) where TKey : notnull { - _policies.Add((keyFactory, limiterFactory)); + Dictionary<TKey, RateLimiter> limiters = new(); + var func = (TResource resource) => + { + if (keyFactory(resource) is TKey key) + { + RateLimiter? limiter; + if (!limiters.TryGetValue(key, out limiter)) + { + limiter = limiterFactory(key); + limiters.Add(key, limiter); + } + return limiter; + } + return null; + }; + _policies.Add(func); return this; } - public AggregateRateLimitBuilder<TResource, TKey> WithConcurrencyPolicy(Func<TResource, TKey?> keyFactory, ConcurrencyLimiterOptions options) + public AggregateRateLimitBuilder<TResource> WithConcurrencyPolicy<TKey>(Func<TResource, TKey?> keyFactory, ConcurrencyLimiterOptions options) where TKey : notnull { - _policies.Add((keyFactory, _ => new ConcurrencyLimiter(options))); + Dictionary<TKey, RateLimiter> limiters = new(); + var func = (TResource resource) => + { + if (keyFactory(resource) is TKey key) + { + RateLimiter? limiter; + if (!limiters.TryGetValue(key, out limiter)) + { + limiter = new ConcurrencyLimiter(options); + limiters.Add(key, limiter); + } + return limiter; + } + return null; + }; + _policies.Add(func); return this; } // Should there be a RateLimiter abstraction for timer based limiters? - // public AggregateRateLimitBuilder<TKey> WithTimerPolicy(Func<TKey, string?> keyFactory, TimeBasedRateLimiter) + // public AggregateRateLimitBuilder<TResource> WithTimerPolicy<TKey>(Func<TKey, string?> keyFactory, TimeBasedRateLimiter) where TKey : notnull; - public AggregateRateLimitBuilder<TResource, TKey> WithTokenBucketPolicy(Func<TResource, TKey?> keyFactory, TokenBucketRateLimiterOptions options) + public AggregateRateLimitBuilder<TResource> WithTokenBucketPolicy<TKey>(Func<TResource, TKey?> keyFactory, TokenBucketRateLimiterOptions options) where TKey : notnull { if (options.AutoReplenishment) { @@ -225,12 +255,27 @@ namespace System.Threading.RateLimiting { _minRefreshInterval = options.ReplenishmentPeriod; } - _policies.Add((keyFactory, _ => new TokenBucketRateLimiter(options))); + Dictionary<TKey, RateLimiter> limiters = new(); + var func = (TResource resource) => + { + if (keyFactory(resource) is TKey key) + { + RateLimiter? limiter; + if (!limiters.TryGetValue(key, out limiter)) + { + limiter = new TokenBucketRateLimiter(options); + limiters.Add(key, limiter); + } + return limiter; + } + return null; + }; + _policies.Add(func); return this; } // might want this to be a factory if the builder is re-usable - public AggregateRateLimitBuilder<TResource, TKey> WithDefaultRateLimiter(RateLimiter defaultRateLimiter) + public AggregateRateLimitBuilder<TResource> WithDefaultRateLimiter(RateLimiter defaultRateLimiter) { _defaultRateLimiter = defaultRateLimiter; return this; @@ -238,20 +283,20 @@ namespace System.Threading.RateLimiting public AggregateRateLimiter<TResource> Build() { - return new Impl<TResource, TKey>(_policies, _defaultRateLimiter, _minRefreshInterval); + return new Impl<TResource>(_policies, _defaultRateLimiter, _minRefreshInterval); } } - internal class Impl<TResource, TKey> : AggregateRateLimiter<TResource> where TKey : notnull + internal class Impl<TResource> : AggregateRateLimiter<TResource> { private readonly RateLimiter _defaultRateLimiter; - private readonly List<(Func<TResource, TKey?>, Func<TKey, RateLimiter>)> _policies; + private readonly List<Func<TResource, RateLimiter?>> _policies; private readonly Timer? _timer; private bool _disposed; - private readonly Dictionary<TKey, RateLimiter> _limiters = new(); + //private readonly Dictionary<TKey, RateLimiter> _limiters = new(); - public Impl(List<(Func<TResource, TKey?>, Func<TKey, RateLimiter>)> policies, RateLimiter defaultRateLimiter, TimeSpan minRefreshInterval) + public Impl(List<Func<TResource, RateLimiter?>> policies, RateLimiter defaultRateLimiter, TimeSpan minRefreshInterval) { _policies = policies; _defaultRateLimiter = defaultRateLimiter; @@ -285,52 +330,41 @@ namespace System.Threading.RateLimiting private RateLimiter GetLimiter(TResource resourceID) { - RateLimiter? limiter = null; - foreach ((Func<TResource, TKey?>, Func<TKey, RateLimiter>) policy in _policies) + lock (_policies) { - TKey? id = policy.Item1(resourceID); - if (id is not null) + foreach (Func<TResource, RateLimiter?> policy in _policies) { - lock (_policies) + RateLimiter? rateLimiter = policy(resourceID); + if (rateLimiter is not null) { - if (!_limiters.TryGetValue(id, out limiter)) - { - limiter = policy.Item2(id); - _limiters.Add(id, limiter); - } + // if (rateLimiter is ReplenishingRateLimiter replenishingRateLimiter) + // ... + return rateLimiter; } - break; } } - if (limiter is null) - { - lock (_policies) - { - limiter = _defaultRateLimiter; - } - } - return limiter; + return _defaultRateLimiter; } private static void Tick(object? obj) { - Impl<TResource, TKey> aggregateLimiter = (Impl<TResource, TKey>)obj!; - - lock (aggregateLimiter._policies) - { - foreach (KeyValuePair<TKey, RateLimiter> limiter in aggregateLimiter._limiters) - { - if (limiter.Value is TokenBucketRateLimiter tokenBucketRateLimiter) - { - tokenBucketRateLimiter.TryReplenish(); - } - - // Remove limiters that have full permits? Maybe put them in a queue of potential limiters to remove - // Or have an abstraction/method that lets us query a rate limiter to see if it's idle - //limiter.Value.GetAvailablePermits(); - } - } + //Impl<TResource, TKey> aggregateLimiter = (Impl<TResource, TKey>)obj!; + + //lock (aggregateLimiter._policies) + //{ + // foreach (KeyValuePair<TKey, RateLimiter> limiter in aggregateLimiter._limiters) + // { + // if (limiter.Value is TokenBucketRateLimiter tokenBucketRateLimiter) + // { + // tokenBucketRateLimiter.TryReplenish(); + // } + + // // Remove limiters that have full permits? Maybe put them in a queue of potential limiters to remove + // // Or have an abstraction/method that lets us query a rate limiter to see if it's idle + // //limiter.Value.GetAvailablePermits(); + // } + //} } protected override void Dispose(bool disposing) @@ -349,8 +383,7 @@ namespace System.Threading.RateLimiting _disposed = true; _timer?.Dispose(); - - _limiters.Clear(); + _policies.Clear(); } } |