Welcome to mirror list, hosted at ThFree Co, Russian Federation.

rate_limiter.go « limithandler « middleware « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 89f6deeab0c7295a5fb6e269bf7586b83f030c88 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package limithandler

import (
	"context"
	"errors"
	"sync"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
	"gitlab.com/gitlab-org/labkit/log"
	"golang.org/x/time/rate"
	"google.golang.org/protobuf/types/known/durationpb"
)

// RateLimiter is an implementation of Limiter that puts a hard limit on the
// number of requests per second
type RateLimiter struct {
	limitersByKey, lastAccessedByKey sync.Map
	refillInterval                   time.Duration
	burst                            int
	requestsDroppedMetric            prometheus.Counter
	ticker                           helper.Ticker
}

// ErrRateLimit is returned when RateLimiter determined a request has breached
// the rate request limit.
var ErrRateLimit = errors.New("rate limit reached")

// Limit rejects an incoming reequest if the maximum number of requests per
// second has been reached
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
	limiter, _ := r.limitersByKey.LoadOrStore(
		lockKey,
		rate.NewLimiter(rate.Every(r.refillInterval), r.burst),
	)
	r.lastAccessedByKey.Store(lockKey, time.Now())

	if !limiter.(*rate.Limiter).Allow() {
		// For now, we are only emitting this metric to get an idea of the shape
		// of traffic.
		r.requestsDroppedMetric.Inc()

		err := helper.ErrUnavailable(ErrRateLimit)

		detailedErr, errGeneratingDetailedErr := helper.ErrWithDetails(
			err,
			&gitalypb.LimitError{
				ErrorMessage: ErrRateLimit.Error(),
				RetryAfter:   durationpb.New(0),
			},
		)
		if errGeneratingDetailedErr != nil {
			log.WithField("rate_limit_error", err).
				WithError(errGeneratingDetailedErr).
				Error("failed to generate detailed error")

			return nil, err
		}

		return nil, detailedErr
	}

	return f()
}

// PruneUnusedLimiters enters an infinite loop to periodically check if any
// limiters can be cleaned up. This is meant to be called in a separate
// goroutine.
func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context) {
	defer r.ticker.Stop()
	for {
		r.ticker.Reset()
		select {
		case <-r.ticker.C():
			r.pruneUnusedLimiters()
		case <-ctx.Done():
			return
		}
	}
}

func (r *RateLimiter) pruneUnusedLimiters() {
	r.lastAccessedByKey.Range(func(key, value interface{}) bool {
		if value.(time.Time).Before(time.Now().Add(-10 * r.refillInterval)) {
			r.limitersByKey.Delete(key)
		}

		return true
	})
}

// NewRateLimiter creates a new instance of RateLimiter
func NewRateLimiter(
	refillInterval time.Duration,
	burst int,
	ticker helper.Ticker,
	requestsDroppedMetric prometheus.Counter,
) *RateLimiter {
	r := &RateLimiter{
		refillInterval:        refillInterval,
		burst:                 burst,
		requestsDroppedMetric: requestsDroppedMetric,
		ticker:                ticker,
	}

	return r
}

// WithRateLimiters sets up a middleware with limiters that limit requests
// based on its rate per second per RPC
func WithRateLimiters(ctx context.Context) SetupFunc {
	return func(cfg config.Cfg, middleware *LimiterMiddleware) {
		result := make(map[string]Limiter)

		for _, limitCfg := range cfg.RateLimiting {
			if limitCfg.Burst > 0 && limitCfg.Interval > 0 {
				serviceName, methodName := splitMethodName(limitCfg.RPC)
				rateLimiter := NewRateLimiter(
					limitCfg.Interval,
					limitCfg.Burst,
					helper.NewTimerTicker(5*time.Minute),
					middleware.requestsDroppedMetric.With(prometheus.Labels{
						"system":       "gitaly",
						"grpc_service": serviceName,
						"grpc_method":  methodName,
						"reason":       "rate",
					}),
				)
				result[limitCfg.RPC] = rateLimiter
				go rateLimiter.PruneUnusedLimiters(ctx)
			}
		}

		middleware.methodLimiters = result
	}
}