diff options
author | John Cai <jcai@gitlab.com> | 2022-03-30 17:29:33 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-03-31 17:20:47 +0300 |
commit | f3ba9a03a9cd7e22cca1ac1f694341a3dd533b92 (patch) | |
tree | acebac160bf374841b5e78ccba7d2c66b45a7462 | |
parent | 9045830df5ffcb7a28b5d0ec9a294d0f32f9f0a7 (diff) |
limithandler: Prune unused limiters in RateLimiterjc-rate-limiter
RateLimiter contains a limiter per rpc/repo pair. We don't want this to
grow monotinically since it will incur a heavy memory burden on the
machine. Instead, introduce a background process that looks through the
limiters and removes the ones that have not been used in the past 10
refill intervals.
-rw-r--r-- | cmd/gitaly/main.go | 2 | ||||
-rw-r--r-- | internal/middleware/limithandler/middleware_test.go | 2 | ||||
-rw-r--r-- | internal/middleware/limithandler/rate_limiter.go | 69 | ||||
-rw-r--r-- | internal/middleware/limithandler/rate_limiter_test.go | 81 |
4 files changed, 130 insertions, 24 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 291ef3d96..3d3a1bb0c 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -222,7 +222,7 @@ func run(cfg config.Cfg) error { rateLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithRateLimiters, + limithandler.WithRateLimiters(ctx), ) prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler) diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go index 8db7fb83c..2076fb893 100644 --- a/internal/middleware/limithandler/middleware_test.go +++ b/internal/middleware/limithandler/middleware_test.go @@ -332,7 +332,7 @@ func testRateLimitHandlerMetrics(t *testing.T, ctx context.Context) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters) + lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters(ctx)) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() diff --git a/internal/middleware/limithandler/rate_limiter.go b/internal/middleware/limithandler/rate_limiter.go index 1c7a16a5c..ef5d119b1 100644 --- a/internal/middleware/limithandler/rate_limiter.go +++ b/internal/middleware/limithandler/rate_limiter.go @@ -16,10 +16,10 @@ import ( // RateLimiter is an implementation of Limiter that puts a hard limit on the // number of requests per second type RateLimiter struct { - limitersByKey sync.Map - refillInterval time.Duration - burst int - requestsDroppedMetric prometheus.Counter + limitersByKey, lastAccessedByKey sync.Map + refillInterval time.Duration + burst int + requestsDroppedMetric prometheus.Counter } // Limit rejects an incoming reequest if the maximum number of requests per @@ -28,6 +28,8 @@ func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) limiter, _ := r.limitersByKey.LoadOrStore( lockKey, rate.NewLimiter(rate.Every(r.refillInterval), r.burst)) + r.lastAccessedByKey.Store(lockKey, time.Now()) + if !limiter.(*rate.Limiter).Allow() { // For now, we are only emitting this metric to get an idea of the shape // of traffic. @@ -42,6 +44,7 @@ func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) // NewRateLimiter creates a new instance of RateLimiter func NewRateLimiter( + ctx context.Context, refillInterval time.Duration, burst int, requestsDroppedMetric prometheus.Counter, @@ -52,29 +55,55 @@ func NewRateLimiter( requestsDroppedMetric: requestsDroppedMetric, } + timer := time.NewTimer(5 * time.Minute) + defer timer.Stop() + go func() { + for { + select { + case <-timer.C: + r.pruneUnusedLimiters(ctx) + case <-ctx.Done(): + return + } + } + }() + return r } // WithRateLimiters sets up a middleware with limiters that limit requests // based on its rate per second per RPC -func WithRateLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - result := make(map[string]Limiter) +func WithRateLimiters(ctx context.Context) SetupFunc { + return func(cfg config.Cfg, middleware *LimiterMiddleware) { + result := make(map[string]Limiter) - for _, limitCfg := range cfg.RateLimiting { - if limitCfg.Burst > 0 { - serviceName, methodName := splitMethodName(limitCfg.RPC) - result[limitCfg.RPC] = NewRateLimiter( - limitCfg.Interval.Duration(), - limitCfg.Burst, - middleware.requestsDroppedMetric.With(prometheus.Labels{ - "system": "gitaly", - "grpc_service": serviceName, - "grpc_method": methodName, - "reason": "rate", - }), - ) + for _, limitCfg := range cfg.RateLimiting { + if limitCfg.Burst > 0 && limitCfg.Interval > 0 { + serviceName, methodName := splitMethodName(limitCfg.RPC) + result[limitCfg.RPC] = NewRateLimiter( + ctx, + limitCfg.Interval.Duration(), + limitCfg.Burst, + middleware.requestsDroppedMetric.With(prometheus.Labels{ + "system": "gitaly", + "grpc_service": serviceName, + "grpc_method": methodName, + "reason": "rate", + }), + ) + } } + + middleware.methodLimiters = result } +} + +func (r *RateLimiter) pruneUnusedLimiters(ctx context.Context) { + r.lastAccessedByKey.Range(func(key, value interface{}) bool { + if value.(time.Time).Before(time.Now().Add(-10 * r.refillInterval)) { + r.limitersByKey.Delete(key) + } - middleware.methodLimiters = result + return true + }) } diff --git a/internal/middleware/limithandler/rate_limiter_test.go b/internal/middleware/limithandler/rate_limiter_test.go index a91029d02..ab55f1aac 100644 --- a/internal/middleware/limithandler/rate_limiter_test.go +++ b/internal/middleware/limithandler/rate_limiter_test.go @@ -27,7 +27,7 @@ func testRateLimiter(t *testing.T, ctx context.Context) { Help: "how many requests dropped?", }) - rateLimiter := NewRateLimiter(1*time.Hour, 100, droppedRequestMetric) + rateLimiter := NewRateLimiter(ctx, 1*time.Hour, 100, droppedRequestMetric) _, err := rateLimiter.Limit(ctx, "", func() (interface{}, error) { return nil, nil @@ -52,7 +52,7 @@ gitaly_requests_dropped_total 0 Help: "how many requests dropped?", }) - rateLimiter := NewRateLimiter(1*time.Hour, 5, droppedRequestMetric) + rateLimiter := NewRateLimiter(ctx, 1*time.Hour, 5, droppedRequestMetric) for i := 0; i < 5; i++ { _, err := rateLimiter.Limit(ctx, "", func() (interface{}, error) { @@ -83,3 +83,80 @@ gitaly_requests_dropped_total 1 )) }) } + +func TestRateLimiter_pruneUnusedLimiters(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + setup func(r *RateLimiter) + refillInterval time.Duration + expectedLimiters, expectedRemovedLimiters []string + }{ + { + desc: "none are prunable", + setup: func(r *RateLimiter) { + r.limitersByKey.Store("a", struct{}{}) + r.limitersByKey.Store("b", struct{}{}) + r.limitersByKey.Store("c", struct{}{}) + r.lastAccessedByKey.Store("a", time.Now()) + r.lastAccessedByKey.Store("b", time.Now()) + r.lastAccessedByKey.Store("c", time.Now()) + }, + refillInterval: time.Second, + expectedLimiters: []string{"a", "b", "c"}, + expectedRemovedLimiters: []string{}, + }, + { + desc: "all are prunable", + setup: func(r *RateLimiter) { + r.limitersByKey.Store("a", struct{}{}) + r.limitersByKey.Store("b", struct{}{}) + r.limitersByKey.Store("c", struct{}{}) + r.lastAccessedByKey.Store("a", time.Now().Add(-1*time.Minute)) + r.lastAccessedByKey.Store("b", time.Now().Add(-1*time.Minute)) + r.lastAccessedByKey.Store("c", time.Now().Add(-1*time.Minute)) + }, + refillInterval: time.Second, + expectedLimiters: []string{}, + expectedRemovedLimiters: []string{"a", "b", "c"}, + }, + { + desc: "one is prunable", + setup: func(r *RateLimiter) { + r.limitersByKey.Store("a", struct{}{}) + r.limitersByKey.Store("b", struct{}{}) + r.limitersByKey.Store("c", struct{}{}) + r.lastAccessedByKey.Store("a", time.Now()) + r.lastAccessedByKey.Store("b", time.Now()) + r.lastAccessedByKey.Store("c", time.Now().Add(-1*time.Minute)) + }, + refillInterval: time.Second, + expectedLimiters: []string{"a", "b"}, + expectedRemovedLimiters: []string{"c"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + rateLimiter := &RateLimiter{ + refillInterval: tc.refillInterval, + } + + tc.setup(rateLimiter) + + ctx := testhelper.Context(t) + rateLimiter.pruneUnusedLimiters(ctx) + + for _, expectedLimiter := range tc.expectedLimiters { + _, ok := rateLimiter.limitersByKey.Load(expectedLimiter) + assert.True(t, ok) + } + + for _, expectedRemovedLimiter := range tc.expectedRemovedLimiters { + _, ok := rateLimiter.limitersByKey.Load(expectedRemovedLimiter) + assert.False(t, ok) + } + }) + } +} |