diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-09-29 17:43:46 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-09-29 17:43:46 +0300 |
commit | 4495d853dab3d79ad87a4448d8d4e5c012345886 (patch) | |
tree | 6b785004adfdbdb9590d0112b39f6c93d1a7ac8c | |
parent | a1557e8a526fc4554781f94afa3ac5102abb7264 (diff) | |
parent | b87e16084b871680a4338530e6efa315b18de94a (diff) |
Merge branch 'qmnguyen0711/prepare-per-rpc-limiting' into 'master'
Refactor per-PRC limiters to prepare for adaptiveness integration
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6412
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/cli/gitaly/serve.go | 9 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 16 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 44 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 6 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 133 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 53 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 3 |
7 files changed, 187 insertions, 77 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 66d8cbeb3..e436c7d48 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -298,12 +298,13 @@ func run(cfg config.Cfg, logger log.Logger) error { adaptiveLimits := []limiter.AdaptiveLimiter{} - concurrencyLimitHandler := limithandler.New( + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + perRPCLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithConcurrencyLimiters, + setupPerRPCConcurrencyLimiters, ) - prometheus.MustRegister(concurrencyLimitHandler) + prometheus.MustRegister(perRPCLimitHandler) rateLimitHandler := limithandler.New( cfg, @@ -363,7 +364,7 @@ func run(cfg config.Cfg, logger log.Logger) error { logger, registry, diskCache, - []*limithandler.LimiterMiddleware{concurrencyLimitHandler, rateLimitHandler}, + []*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler}, ) defer gitalyServerFactory.Stop() diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 6ea9177bf..262299f62 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -441,6 +441,15 @@ type Concurrency struct { MaxQueueWait duration.Duration `toml:"max_queue_wait" json:"max_queue_wait"` } +// Validate runs validation on all fields and compose all found errors. +func (c Concurrency) Validate() error { + return cfgerror.New(). + Append(cfgerror.Comparable(c.MaxPerRepo).GreaterOrEqual(0), "max_per_repo"). + Append(cfgerror.Comparable(c.MaxQueueSize).GreaterOrEqual(0), "max_queue_size"). + Append(cfgerror.Comparable(c.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait"). + AsError() +} + // RateLimiting allows endpoints to be limited to a maximum request rate per // second. The rate limiter uses a concept of a "token bucket". In order to serve a // request, a token is retrieved from the token bucket. The size of the token @@ -673,6 +682,13 @@ func (cfg *Cfg) ValidateV2() error { return cfg.DailyMaintenance.Validate(storages) }}, {field: "cgroups", validate: cfg.Cgroups.Validate}, + {field: "concurrency", validate: func() error { + var errs cfgerror.ValidationErrors + for i, concurrency := range cfg.Concurrency { + errs = errs.Append(concurrency.Validate(), fmt.Sprintf("[%d]", i)) + } + return errs.AsError() + }}, {field: "pack_objects_cache", validate: cfg.PackObjectsCache.Validate}, {field: "pack_objects_limiting", validate: cfg.PackObjectsLimiting.Validate}, {field: "backup", validate: cfg.Backup.Validate}, diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index 1ec917e64..cf730f17e 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1808,6 +1808,50 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { ) } +func TestConcurrency_Validate(t *testing.T) { + t.Parallel() + + require.NoError(t, Concurrency{MaxPerRepo: 0}.Validate()) + require.NoError(t, Concurrency{MaxPerRepo: 1}.Validate()) + require.NoError(t, Concurrency{MaxPerRepo: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), + "max_per_repo", + ), + }, + Concurrency{MaxPerRepo: -1}.Validate(), + ) + + require.NoError(t, Concurrency{MaxQueueSize: 0}.Validate()) + require.NoError(t, Concurrency{MaxQueueSize: 1}.Validate()) + require.NoError(t, Concurrency{MaxQueueSize: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), + "max_queue_size", + ), + }, + Concurrency{MaxQueueSize: -1}.Validate(), + ) + + require.NoError(t, Concurrency{MaxQueueWait: duration.Duration(1)}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1m0s is not greater than or equal to 0s", cfgerror.ErrNotInRange), + "max_queue_wait", + ), + }, + Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}.Validate(), + ) +} + func TestStorage_Validate(t *testing.T) { t.Parallel() diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 7ab5aad0e..61f289b73 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -199,7 +199,8 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator, logger) - limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) @@ -237,12 +238,13 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { testhelper.MustClose(t, conns) }) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) srv, err := NewGitalyServerFactory( cfg, testhelper.SharedLogger(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), - []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)}, ).New(true) require.NoError(t, err) diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index efc622eb6..c36c5b614 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -2,6 +2,7 @@ package limithandler import ( "context" + "fmt" "strings" "time" @@ -165,72 +166,80 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository -func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } - - result := make(map[string]limiter.Limiter) - for _, limit := range cfg.Concurrency { - limit := limit - - result[limit.RPC] = limiter.NewConcurrencyLimiter( - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}), - limit.MaxQueueSize, - limit.MaxQueueWait.Duration(), - limiter.NewPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), +func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) { + perRPCLimits := map[string]*limiter.AdaptiveLimit{} + for _, concurrency := range cfg.Concurrency { + perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit( + fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo}, ) } - - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), - 0, - 0, - limiter.NewPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), + return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, ) - } - middleware.methodLimiters = result + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) + } + + result := make(map[string]limiter.Limiter) + for _, concurrency := range cfg.Concurrency { + concurrency := concurrency + + result[concurrency.RPC] = limiter.NewConcurrencyLimiter( + perRPCLimits[concurrency.RPC], + concurrency.MaxQueueSize, + concurrency.MaxQueueWait.Duration(), + limiter.NewPerRPCPromMonitor( + "gitaly", concurrency.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), + 0, + 0, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + middleware.methodLimiters = result + } } // WithRateLimiters sets up a middleware with limiters that limit requests diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index e81c4db09..69a720bcb 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -31,6 +31,35 @@ func fixedLockKey(ctx context.Context) string { return "fixed-id" } +func TestWithConcurrencyLimiters(t *testing.T) { + t.Parallel() + + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + { + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 1, + }, + { + RPC: "/grpc.testing.TestService/FullDuplexCall", + MaxPerRepo: 99, + }, + }, + } + limits, _ := limithandler.WithConcurrencyLimiters(cfg) + require.Equal(t, 2, len(limits)) + + limit := limits["/grpc.testing.TestService/UnaryCall"] + require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name()) + require.Equal(t, limiter.AdaptiveSetting{Initial: 1}, limit.Setting()) + require.Equal(t, 1, limit.Current()) + + limit = limits["/grpc.testing.TestService/FullDuplexCall"] + require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name()) + require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting()) + require.Equal(t, 99, limit.Current()) +} + func TestUnaryLimitHandler(t *testing.T) { t.Parallel() @@ -47,7 +76,8 @@ func TestUnaryLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -101,7 +131,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { ctx := testhelper.Context(t) t.Run("simple timeout", func(t *testing.T) { - lh := limithandler.New(config.Cfg{ + cfg := config.Cfg{ Concurrency: []config.Concurrency{ { RPC: "/grpc.testing.TestService/UnaryCall", @@ -123,7 +153,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxQueueWait: duration.Duration(100 * time.Millisecond), }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + } + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -173,7 +205,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { }) t.Run("unlimited queueing", func(t *testing.T) { - lh := limithandler.New(config.Cfg{ + cfg := config.Cfg{ Concurrency: []config.Concurrency{ // Due to a bug queueing wait times used to leak into subsequent // concurrency configuration in case they didn't explicitly set up @@ -191,7 +223,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxPerRepo: 1, }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + } + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -456,7 +490,8 @@ func TestStreamLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -506,7 +541,8 @@ func TestStreamLimitHandler_error(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -626,7 +662,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index e7b2659e7..770fdc74c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -333,7 +333,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) } if gsd.repositoryCounter == nil { |