Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-03-21 23:55:26 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-06 22:27:33 +0300
commit00266a9213efcda1138e1977c476d6ceff4ba9b3 (patch)
tree3eeb8af4ac24cc517257d82c9aed506c0d99298b
parentadd9d6e101635199301391518726a8113394e968 (diff)
limithandler: Introduce rate limiter middleware
Introduce a simple rate limiter that limits the number of requests a minute that an RPC can allow. If the feature flag is enabled, the middleware will drop any request that bursts the per second limit of the RPC. Otherwise, it will only emit metrics so we can first have some data on the traffic profile.
-rw-r--r--NOTICE30
-rw-r--r--go.mod1
-rw-r--r--go.sum3
-rw-r--r--internal/metadata/featureflag/ff_rate_limiter.go5
-rw-r--r--internal/middleware/limithandler/middleware_test.go82
-rw-r--r--internal/middleware/limithandler/rate_limiter.go81
6 files changed, 201 insertions, 1 deletions
diff --git a/NOTICE b/NOTICE
index 96c4113c4..ca45a391d 100644
--- a/NOTICE
+++ b/NOTICE
@@ -16282,6 +16282,36 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+LICENSE - golang.org/x/time/rate
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - golang.org/x/xerrors
Copyright (c) 2019 The Go Authors. All rights reserved.
diff --git a/go.mod b/go.mod
index 5a506cea6..6c4fd5106 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211102192858-4dd72447c267
+ golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.26.0
)
diff --git a/go.sum b/go.sum
index bd266ef1e..cd5e2b479 100644
--- a/go.sum
+++ b/go.sum
@@ -1209,8 +1209,9 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
+golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/internal/metadata/featureflag/ff_rate_limiter.go b/internal/metadata/featureflag/ff_rate_limiter.go
new file mode 100644
index 000000000..7207e4b37
--- /dev/null
+++ b/internal/metadata/featureflag/ff_rate_limiter.go
@@ -0,0 +1,5 @@
+package featureflag
+
+// RateLimit will enable the rate limiter to reject requests beyond a configured
+// rate.
+var RateLimit = NewFeatureFlag("rate_limit", false)
diff --git a/internal/middleware/limithandler/middleware_test.go b/internal/middleware/limithandler/middleware_test.go
index beebb3d54..3a6b3e920 100644
--- a/internal/middleware/limithandler/middleware_test.go
+++ b/internal/middleware/limithandler/middleware_test.go
@@ -17,6 +17,8 @@ import (
pb "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler/testdata"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func TestMain(m *testing.M) {
@@ -315,6 +317,86 @@ gitaly_requests_dropped_total{grpc_method="Unary",grpc_service="test.limithandle
<-respCh
}
+func TestRateLimitHandler(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.RateLimit).Run(t, testRateLimitHandler)
+}
+
+func testRateLimitHandler(t *testing.T, ctx context.Context) {
+ methodName := "/test.limithandler.Test/Unary"
+ cfg := config.Cfg{
+ RateLimiting: []config.RateLimiting{
+ {RPC: methodName, Interval: 1 * time.Hour, Burst: 1},
+ },
+ }
+
+ t.Run("rate has hit max", func(t *testing.T) {
+ s := &server{blockCh: make(chan struct{})}
+
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters)
+ interceptor := lh.UnaryInterceptor()
+ srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer testhelper.MustClose(t, conn)
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := client.Unary(ctx, &pb.UnaryRequest{})
+ require.NoError(t, err)
+ }()
+ // wait until the first request is being processed so we know the rate
+ // limiter already knows about it.
+ s.blockCh <- struct{}{}
+ close(s.blockCh)
+
+ for i := 0; i < 10; i++ {
+ _, err := client.Unary(ctx, &pb.UnaryRequest{})
+
+ if featureflag.RateLimit.IsEnabled(ctx) {
+ testhelper.RequireGrpcError(t, status.Error(codes.Unavailable, "too many requests"), err)
+ } else {
+ require.NoError(t, err)
+ }
+ }
+
+ expectedMetrics := `# HELP gitaly_requests_dropped_total Number of requests dropped from the queue
+# TYPE gitaly_requests_dropped_total counter
+gitaly_requests_dropped_total{grpc_method="Unary",grpc_service="test.limithandler.Test",reason="rate",system="gitaly"} 10
+`
+ assert.NoError(t, promtest.CollectAndCompare(lh, bytes.NewBufferString(expectedMetrics),
+ "gitaly_requests_dropped_total"))
+
+ wg.Wait()
+ })
+
+ t.Run("rate has not hit max", func(t *testing.T) {
+ s := &server{blockCh: make(chan struct{})}
+
+ lh := limithandler.New(cfg, fixedLockKey, limithandler.WithRateLimiters)
+ interceptor := lh.UnaryInterceptor()
+ srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
+ defer srv.Stop()
+
+ client, conn := newClient(t, serverSocketPath)
+ defer testhelper.MustClose(t, conn)
+
+ close(s.blockCh)
+ _, err := client.Unary(ctx, &pb.UnaryRequest{})
+ require.NoError(t, err)
+
+ expectedMetrics := `# HELP gitaly_requests_dropped_total Number of requests dropped from the queue
+# TYPE gitaly_requests_dropped_total counter
+gitaly_requests_dropped_total{grpc_method="Unary",grpc_service="test.limithandler.Test",reason="rate",system="gitaly"} 0
+`
+ assert.NoError(t, promtest.CollectAndCompare(lh, bytes.NewBufferString(expectedMetrics),
+ "gitaly_requests_dropped_total"))
+ })
+}
+
func runServer(t *testing.T, s pb.TestServer, opt ...grpc.ServerOption) (*grpc.Server, string) {
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
grpcServer := grpc.NewServer(opt...)
diff --git a/internal/middleware/limithandler/rate_limiter.go b/internal/middleware/limithandler/rate_limiter.go
new file mode 100644
index 000000000..b8bc605db
--- /dev/null
+++ b/internal/middleware/limithandler/rate_limiter.go
@@ -0,0 +1,81 @@
+package limithandler
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "golang.org/x/time/rate"
+)
+
+// RateLimiter is an implementation of Limiter that puts a hard limit on the
+// number of requests per second
+type RateLimiter struct {
+ limitersByKey sync.Map
+ refillInterval time.Duration
+ burst int
+ requestsDroppedMetric prometheus.Counter
+}
+
+// Limit rejects an incoming reequest if the maximum number of requests per
+// second has been reached
+func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error) {
+ limiter, _ := r.limitersByKey.LoadOrStore(
+ lockKey,
+ rate.NewLimiter(rate.Every(r.refillInterval), r.burst),
+ )
+ if !limiter.(*rate.Limiter).Allow() {
+ // For now, we are only emitting this metric to get an idea of the shape
+ // of traffic.
+ r.requestsDroppedMetric.Inc()
+ if featureflag.RateLimit.IsEnabled(ctx) {
+ return nil, helper.ErrUnavailable(errors.New("too many requests"))
+ }
+ }
+
+ return f()
+}
+
+// NewRateLimiter creates a new instance of RateLimiter
+func NewRateLimiter(
+ refillInterval time.Duration,
+ burst int,
+ requestsDroppedMetric prometheus.Counter,
+) *RateLimiter {
+ r := &RateLimiter{
+ refillInterval: refillInterval,
+ burst: burst,
+ requestsDroppedMetric: requestsDroppedMetric,
+ }
+
+ return r
+}
+
+// WithRateLimiters sets up a middleware with limiters that limit requests
+// based on its rate per second per RPC
+func WithRateLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
+ result := make(map[string]Limiter)
+
+ for _, limitCfg := range cfg.RateLimiting {
+ if limitCfg.Burst > 0 && limitCfg.Interval > 0 {
+ serviceName, methodName := splitMethodName(limitCfg.RPC)
+ result[limitCfg.RPC] = NewRateLimiter(
+ limitCfg.Interval,
+ limitCfg.Burst,
+ middleware.requestsDroppedMetric.With(prometheus.Labels{
+ "system": "gitaly",
+ "grpc_service": serviceName,
+ "grpc_method": methodName,
+ "reason": "rate",
+ }),
+ )
+ }
+ }
+
+ middleware.methodLimiters = result
+}