Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-09-29 17:43:46 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-09-29 17:43:46 +0300
commit4495d853dab3d79ad87a4448d8d4e5c012345886 (patch)
tree6b785004adfdbdb9590d0112b39f6c93d1a7ac8c
parenta1557e8a526fc4554781f94afa3ac5102abb7264 (diff)
parentb87e16084b871680a4338530e6efa315b18de94a (diff)
Merge branch 'qmnguyen0711/prepare-per-rpc-limiting' into 'master'
Refactor per-PRC limiters to prepare for adaptiveness integration See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6412 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/cli/gitaly/serve.go9
-rw-r--r--internal/gitaly/config/config.go16
-rw-r--r--internal/gitaly/config/config_test.go44
-rw-r--r--internal/gitaly/server/auth_test.go6
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go133
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go53
-rw-r--r--internal/testhelper/testserver/gitaly.go3
7 files changed, 187 insertions, 77 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 66d8cbeb3..e436c7d48 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -298,12 +298,13 @@ func run(cfg config.Cfg, logger log.Logger) error {
adaptiveLimits := []limiter.AdaptiveLimiter{}
- concurrencyLimitHandler := limithandler.New(
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ perRPCLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithConcurrencyLimiters,
+ setupPerRPCConcurrencyLimiters,
)
- prometheus.MustRegister(concurrencyLimitHandler)
+ prometheus.MustRegister(perRPCLimitHandler)
rateLimitHandler := limithandler.New(
cfg,
@@ -363,7 +364,7 @@ func run(cfg config.Cfg, logger log.Logger) error {
logger,
registry,
diskCache,
- []*limithandler.LimiterMiddleware{concurrencyLimitHandler, rateLimitHandler},
+ []*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler},
)
defer gitalyServerFactory.Stop()
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 6ea9177bf..262299f62 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -441,6 +441,15 @@ type Concurrency struct {
MaxQueueWait duration.Duration `toml:"max_queue_wait" json:"max_queue_wait"`
}
+// Validate runs validation on all fields and compose all found errors.
+func (c Concurrency) Validate() error {
+ return cfgerror.New().
+ Append(cfgerror.Comparable(c.MaxPerRepo).GreaterOrEqual(0), "max_per_repo").
+ Append(cfgerror.Comparable(c.MaxQueueSize).GreaterOrEqual(0), "max_queue_size").
+ Append(cfgerror.Comparable(c.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait").
+ AsError()
+}
+
// RateLimiting allows endpoints to be limited to a maximum request rate per
// second. The rate limiter uses a concept of a "token bucket". In order to serve a
// request, a token is retrieved from the token bucket. The size of the token
@@ -673,6 +682,13 @@ func (cfg *Cfg) ValidateV2() error {
return cfg.DailyMaintenance.Validate(storages)
}},
{field: "cgroups", validate: cfg.Cgroups.Validate},
+ {field: "concurrency", validate: func() error {
+ var errs cfgerror.ValidationErrors
+ for i, concurrency := range cfg.Concurrency {
+ errs = errs.Append(concurrency.Validate(), fmt.Sprintf("[%d]", i))
+ }
+ return errs.AsError()
+ }},
{field: "pack_objects_cache", validate: cfg.PackObjectsCache.Validate},
{field: "pack_objects_limiting", validate: cfg.PackObjectsLimiting.Validate},
{field: "backup", validate: cfg.Backup.Validate},
diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go
index 1ec917e64..cf730f17e 100644
--- a/internal/gitaly/config/config_test.go
+++ b/internal/gitaly/config/config_test.go
@@ -1808,6 +1808,50 @@ func TestPackObjectsLimiting_Validate(t *testing.T) {
)
}
+func TestConcurrency_Validate(t *testing.T) {
+ t.Parallel()
+
+ require.NoError(t, Concurrency{MaxPerRepo: 0}.Validate())
+ require.NoError(t, Concurrency{MaxPerRepo: 1}.Validate())
+ require.NoError(t, Concurrency{MaxPerRepo: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange),
+ "max_per_repo",
+ ),
+ },
+ Concurrency{MaxPerRepo: -1}.Validate(),
+ )
+
+ require.NoError(t, Concurrency{MaxQueueSize: 0}.Validate())
+ require.NoError(t, Concurrency{MaxQueueSize: 1}.Validate())
+ require.NoError(t, Concurrency{MaxQueueSize: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange),
+ "max_queue_size",
+ ),
+ },
+ Concurrency{MaxQueueSize: -1}.Validate(),
+ )
+
+ require.NoError(t, Concurrency{MaxQueueWait: duration.Duration(1)}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1m0s is not greater than or equal to 0s", cfgerror.ErrNotInRange),
+ "max_queue_wait",
+ ),
+ },
+ Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}.Validate(),
+ )
+}
+
func TestStorage_Validate(t *testing.T) {
t.Parallel()
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 7ab5aad0e..61f289b73 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -199,7 +199,8 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator, logger)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
@@ -237,12 +238,13 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
conns := client.NewPool()
t.Cleanup(func() { testhelper.MustClose(t, conns) })
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
srv, err := NewGitalyServerFactory(
cfg,
testhelper.SharedLogger(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
- []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)},
).New(true)
require.NoError(t, err)
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index efc622eb6..c36c5b614 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -2,6 +2,7 @@ package limithandler
import (
"context"
+ "fmt"
"strings"
"time"
@@ -165,72 +166,80 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
-func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
-
- result := make(map[string]limiter.Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
-
- result[limit.RPC] = limiter.NewConcurrencyLimiter(
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
- limit.MaxQueueSize,
- limit.MaxQueueWait.Duration(),
- limiter.NewPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
+func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) {
+ perRPCLimits := map[string]*limiter.AdaptiveLimit{}
+ for _, concurrency := range cfg.Concurrency {
+ perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(
+ fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo},
)
}
-
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
- 0,
- 0,
- limiter.NewPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
+ return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
)
- }
- middleware.methodLimiters = result
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
+ }
+
+ result := make(map[string]limiter.Limiter)
+ for _, concurrency := range cfg.Concurrency {
+ concurrency := concurrency
+
+ result[concurrency.RPC] = limiter.NewConcurrencyLimiter(
+ perRPCLimits[concurrency.RPC],
+ concurrency.MaxQueueSize,
+ concurrency.MaxQueueWait.Duration(),
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", concurrency.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
+ 0,
+ 0,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ middleware.methodLimiters = result
+ }
}
// WithRateLimiters sets up a middleware with limiters that limit requests
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index e81c4db09..69a720bcb 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -31,6 +31,35 @@ func fixedLockKey(ctx context.Context) string {
return "fixed-id"
}
+func TestWithConcurrencyLimiters(t *testing.T) {
+ t.Parallel()
+
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {
+ RPC: "/grpc.testing.TestService/UnaryCall",
+ MaxPerRepo: 1,
+ },
+ {
+ RPC: "/grpc.testing.TestService/FullDuplexCall",
+ MaxPerRepo: 99,
+ },
+ },
+ }
+ limits, _ := limithandler.WithConcurrencyLimiters(cfg)
+ require.Equal(t, 2, len(limits))
+
+ limit := limits["/grpc.testing.TestService/UnaryCall"]
+ require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name())
+ require.Equal(t, limiter.AdaptiveSetting{Initial: 1}, limit.Setting())
+ require.Equal(t, 1, limit.Current())
+
+ limit = limits["/grpc.testing.TestService/FullDuplexCall"]
+ require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name())
+ require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting())
+ require.Equal(t, 99, limit.Current())
+}
+
func TestUnaryLimitHandler(t *testing.T) {
t.Parallel()
@@ -47,7 +76,8 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -101,7 +131,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
ctx := testhelper.Context(t)
t.Run("simple timeout", func(t *testing.T) {
- lh := limithandler.New(config.Cfg{
+ cfg := config.Cfg{
Concurrency: []config.Concurrency{
{
RPC: "/grpc.testing.TestService/UnaryCall",
@@ -123,7 +153,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxQueueWait: duration.Duration(100 * time.Millisecond),
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -173,7 +205,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
})
t.Run("unlimited queueing", func(t *testing.T) {
- lh := limithandler.New(config.Cfg{
+ cfg := config.Cfg{
Concurrency: []config.Concurrency{
// Due to a bug queueing wait times used to leak into subsequent
// concurrency configuration in case they didn't explicitly set up
@@ -191,7 +223,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxPerRepo: 1,
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -456,7 +490,8 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -506,7 +541,8 @@ func TestStreamLimitHandler_error(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -626,7 +662,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index e7b2659e7..770fdc74c 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -333,7 +333,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
}
if gsd.repositoryCounter == nil {