Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-30 17:29:33 +0300
committerJohn Cai <jcai@gitlab.com>2022-03-31 17:20:47 +0300
commitf3ba9a03a9cd7e22cca1ac1f694341a3dd533b92 (patch)
treeacebac160bf374841b5e78ccba7d2c66b45a7462
parent9045830df5ffcb7a28b5d0ec9a294d0f32f9f0a7 (diff)
limithandler: Prune unused limiters in RateLimiterjc-rate-limiter
RateLimiter contains a limiter per rpc/repo pair. We don't want this to grow monotinically since it will incur a heavy memory burden on the machine. Instead, introduce a background process that looks through the limiters and removes the ones that have not been used in the past 10 refill intervals.
-rw-r--r--cmd/gitaly/main.go2
-rw-r--r--internal/middleware/limithandler/middleware_test.go2
-rw-r--r--internal/middleware/limithandler/rate_limiter.go69
-rw-r--r--internal/middleware/limithandler/rate_limiter_test.go81
4 files changed, 130 insertions, 24 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 291ef3d96..3d3a1bb0c 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -222,7 +222,7 @@ func run(cfg config.Cfg) error {
rateLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithRateLimiters,
+ limithandler.WithRateLimiters(ctx),
)
prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler)
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index 8db7fb83c..2076fb893 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -332,7 +332,7 @@ func testRateLimitHandlerMetrics(t *testing.T, ctx context.Context) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters)
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters(ctx))
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/middleware/limithandler/rate_limiter.go b/internal/middleware/limithandler/rate_limiter.go
index 1c7a16a5c..ef5d119b1 100644
--- a/internal/middleware/limithandler/rate_limiter.go
+++ b/internal/middleware/limithandler/rate_limiter.go
@@ -16,10 +16,10 @@ import (
// RateLimiter is an implementation of Limiter that puts a hard limit on the
// number of requests per second
type RateLimiter struct {
- limitersByKey sync.Map
- refillInterval time.Duration
- burst int
- requestsDroppedMetric prometheus.Counter
+ limitersByKey, lastAccessedByKey sync.Map
+ refillInterval time.Duration
+ burst int
+ requestsDroppedMetric prometheus.Counter
}
// Limit rejects an incoming reequest if the maximum number of requests per
@@ -28,6 +28,8 @@ func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc)
limiter, _ := r.limitersByKey.LoadOrStore(
lockKey,
rate.NewLimiter(rate.Every(r.refillInterval), r.burst))
+ r.lastAccessedByKey.Store(lockKey, time.Now())
+
if !limiter.(*rate.Limiter).Allow() {
// For now, we are only emitting this metric to get an idea of the shape
// of traffic.
@@ -42,6 +44,7 @@ func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc)
// NewRateLimiter creates a new instance of RateLimiter
func NewRateLimiter(
+ ctx context.Context,
refillInterval time.Duration,
burst int,
requestsDroppedMetric prometheus.Counter,
@@ -52,29 +55,55 @@ func NewRateLimiter(
requestsDroppedMetric: requestsDroppedMetric,
}
+ timer := time.NewTimer(5 * time.Minute)
+ defer timer.Stop()
+ go func() {
+ for {
+ select {
+ case <-timer.C:
+ r.pruneUnusedLimiters(ctx)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
return r
}
// WithRateLimiters sets up a middleware with limiters that limit requests
// based on its rate per second per RPC
-func WithRateLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- result := make(map[string]Limiter)
+func WithRateLimiters(ctx context.Context) SetupFunc {
+ return func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ result := make(map[string]Limiter)
- for _, limitCfg := range cfg.RateLimiting {
- if limitCfg.Burst > 0 {
- serviceName, methodName := splitMethodName(limitCfg.RPC)
- result[limitCfg.RPC] = NewRateLimiter(
- limitCfg.Interval.Duration(),
- limitCfg.Burst,
- middleware.requestsDroppedMetric.With(prometheus.Labels{
- "system": "gitaly",
- "grpc_service": serviceName,
- "grpc_method": methodName,
- "reason": "rate",
- }),
- )
+ for _, limitCfg := range cfg.RateLimiting {
+ if limitCfg.Burst > 0 && limitCfg.Interval > 0 {
+ serviceName, methodName := splitMethodName(limitCfg.RPC)
+ result[limitCfg.RPC] = NewRateLimiter(
+ ctx,
+ limitCfg.Interval.Duration(),
+ limitCfg.Burst,
+ middleware.requestsDroppedMetric.With(prometheus.Labels{
+ "system": "gitaly",
+ "grpc_service": serviceName,
+ "grpc_method": methodName,
+ "reason": "rate",
+ }),
+ )
+ }
}
+
+ middleware.methodLimiters = result
}
+}
+
+func (r *RateLimiter) pruneUnusedLimiters(ctx context.Context) {
+ r.lastAccessedByKey.Range(func(key, value interface{}) bool {
+ if value.(time.Time).Before(time.Now().Add(-10 * r.refillInterval)) {
+ r.limitersByKey.Delete(key)
+ }
- middleware.methodLimiters = result
+ return true
+ })
}
diff --git a/internal/middleware/limithandler/rate_limiter_test.go b/internal/middleware/limithandler/rate_limiter_test.go
index a91029d02..ab55f1aac 100644
--- a/internal/middleware/limithandler/rate_limiter_test.go
+++ b/internal/middleware/limithandler/rate_limiter_test.go
@@ -27,7 +27,7 @@ func testRateLimiter(t *testing.T, ctx context.Context) {
Help: "how many requests dropped?",
})
- rateLimiter := NewRateLimiter(1*time.Hour, 100, droppedRequestMetric)
+ rateLimiter := NewRateLimiter(ctx, 1*time.Hour, 100, droppedRequestMetric)
_, err := rateLimiter.Limit(ctx, "", func() (interface{}, error) {
return nil, nil
@@ -52,7 +52,7 @@ gitaly_requests_dropped_total 0
Help: "how many requests dropped?",
})
- rateLimiter := NewRateLimiter(1*time.Hour, 5, droppedRequestMetric)
+ rateLimiter := NewRateLimiter(ctx, 1*time.Hour, 5, droppedRequestMetric)
for i := 0; i < 5; i++ {
_, err := rateLimiter.Limit(ctx, "", func() (interface{}, error) {
@@ -83,3 +83,80 @@ gitaly_requests_dropped_total 1
))
})
}
+
+func TestRateLimiter_pruneUnusedLimiters(t *testing.T) {
+ t.Parallel()
+
+ testCases := []struct {
+ desc string
+ setup func(r *RateLimiter)
+ refillInterval time.Duration
+ expectedLimiters, expectedRemovedLimiters []string
+ }{
+ {
+ desc: "none are prunable",
+ setup: func(r *RateLimiter) {
+ r.limitersByKey.Store("a", struct{}{})
+ r.limitersByKey.Store("b", struct{}{})
+ r.limitersByKey.Store("c", struct{}{})
+ r.lastAccessedByKey.Store("a", time.Now())
+ r.lastAccessedByKey.Store("b", time.Now())
+ r.lastAccessedByKey.Store("c", time.Now())
+ },
+ refillInterval: time.Second,
+ expectedLimiters: []string{"a", "b", "c"},
+ expectedRemovedLimiters: []string{},
+ },
+ {
+ desc: "all are prunable",
+ setup: func(r *RateLimiter) {
+ r.limitersByKey.Store("a", struct{}{})
+ r.limitersByKey.Store("b", struct{}{})
+ r.limitersByKey.Store("c", struct{}{})
+ r.lastAccessedByKey.Store("a", time.Now().Add(-1*time.Minute))
+ r.lastAccessedByKey.Store("b", time.Now().Add(-1*time.Minute))
+ r.lastAccessedByKey.Store("c", time.Now().Add(-1*time.Minute))
+ },
+ refillInterval: time.Second,
+ expectedLimiters: []string{},
+ expectedRemovedLimiters: []string{"a", "b", "c"},
+ },
+ {
+ desc: "one is prunable",
+ setup: func(r *RateLimiter) {
+ r.limitersByKey.Store("a", struct{}{})
+ r.limitersByKey.Store("b", struct{}{})
+ r.limitersByKey.Store("c", struct{}{})
+ r.lastAccessedByKey.Store("a", time.Now())
+ r.lastAccessedByKey.Store("b", time.Now())
+ r.lastAccessedByKey.Store("c", time.Now().Add(-1*time.Minute))
+ },
+ refillInterval: time.Second,
+ expectedLimiters: []string{"a", "b"},
+ expectedRemovedLimiters: []string{"c"},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ rateLimiter := &RateLimiter{
+ refillInterval: tc.refillInterval,
+ }
+
+ tc.setup(rateLimiter)
+
+ ctx := testhelper.Context(t)
+ rateLimiter.pruneUnusedLimiters(ctx)
+
+ for _, expectedLimiter := range tc.expectedLimiters {
+ _, ok := rateLimiter.limitersByKey.Load(expectedLimiter)
+ assert.True(t, ok)
+ }
+
+ for _, expectedRemovedLimiter := range tc.expectedRemovedLimiters {
+ _, ok := rateLimiter.limitersByKey.Load(expectedRemovedLimiter)
+ assert.False(t, ok)
+ }
+ })
+ }
+}