Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-28 06:40:56 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-28 06:40:56 +0300
commitb87e16084b871680a4338530e6efa315b18de94a (patch)
treec94293d8eae2e67aa37b10856642bfa5c908e95b
parente71de95f649e3f260aa3b775365a2abf9250d05f (diff)
limiter: Let limithandler.WithConcurrencyLimiters return created limits
limithandler.WithConcurrencyLimiters returns a function to setup per-RPC concurrency limiter. This function is called when Gitaly process creates a limiter interceptor. It creates a lot of things and notably adaptive limit structs. Currenty, those limits are not connected to the adaptive calculator. In some later commits, we need to pull those limits out and plug into the calculator so that the calculator can adjust those limits. This commit adjusts the signature of that function and let it return the configured limits.
-rw-r--r--internal/cli/gitaly/serve.go9
-rw-r--r--internal/gitaly/server/auth_test.go6
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go133
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go53
-rw-r--r--internal/testhelper/testserver/gitaly.go3
5 files changed, 127 insertions, 77 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 66d8cbeb3..e436c7d48 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -298,12 +298,13 @@ func run(cfg config.Cfg, logger log.Logger) error {
adaptiveLimits := []limiter.AdaptiveLimiter{}
- concurrencyLimitHandler := limithandler.New(
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ perRPCLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
- limithandler.WithConcurrencyLimiters,
+ setupPerRPCConcurrencyLimiters,
)
- prometheus.MustRegister(concurrencyLimitHandler)
+ prometheus.MustRegister(perRPCLimitHandler)
rateLimitHandler := limithandler.New(
cfg,
@@ -363,7 +364,7 @@ func run(cfg config.Cfg, logger log.Logger) error {
logger,
registry,
diskCache,
- []*limithandler.LimiterMiddleware{concurrencyLimitHandler, rateLimitHandler},
+ []*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler},
)
defer gitalyServerFactory.Stop()
diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go
index 7ab5aad0e..61f289b73 100644
--- a/internal/gitaly/server/auth_test.go
+++ b/internal/gitaly/server/auth_test.go
@@ -199,7 +199,8 @@ func runServer(t *testing.T, cfg config.Cfg) string {
catfileCache := catfile.NewCache(cfg)
t.Cleanup(catfileCache.Stop)
diskCache := cache.New(cfg, locator, logger)
- limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache)
srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false)
@@ -237,12 +238,13 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string {
conns := client.NewPool()
t.Cleanup(func() { testhelper.MustClose(t, conns) })
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
srv, err := NewGitalyServerFactory(
cfg,
testhelper.SharedLogger(t),
backchannel.NewRegistry(),
cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)),
- []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)},
+ []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)},
).New(true)
require.NoError(t, err)
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index efc622eb6..c36c5b614 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -2,6 +2,7 @@ package limithandler
import (
"context"
+ "fmt"
"strings"
"time"
@@ -165,72 +166,80 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
// WithConcurrencyLimiters sets up middleware to limit the concurrency of
// requests based on RPC and repository
-func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) {
- acquiringSecondsMetric := prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "acquiring_seconds",
- Help: "Histogram of time calls are rate limited (in seconds)",
- Buckets: cfg.Prometheus.GRPCLatencyBuckets,
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- inProgressMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "in_progress",
- Help: "Gauge of number of concurrent in-progress calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
- queuedMetric := prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Namespace: "gitaly",
- Subsystem: "concurrency_limiting",
- Name: "queued",
- Help: "Gauge of number of queued calls",
- },
- []string{"system", "grpc_service", "grpc_method"},
- )
-
- middleware.collect = func(metrics chan<- prometheus.Metric) {
- acquiringSecondsMetric.Collect(metrics)
- inProgressMetric.Collect(metrics)
- queuedMetric.Collect(metrics)
- }
-
- result := make(map[string]limiter.Limiter)
- for _, limit := range cfg.Concurrency {
- limit := limit
-
- result[limit.RPC] = limiter.NewConcurrencyLimiter(
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}),
- limit.MaxQueueSize,
- limit.MaxQueueWait.Duration(),
- limiter.NewPerRPCPromMonitor(
- "gitaly", limit.RPC,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
+func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) {
+ perRPCLimits := map[string]*limiter.AdaptiveLimit{}
+ for _, concurrency := range cfg.Concurrency {
+ perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(
+ fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo},
)
}
-
- // Set default for ReplicateRepository.
- replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
- if _, ok := result[replicateRepositoryFullMethod]; !ok {
- result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
- limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
- 0,
- 0,
- limiter.NewPerRPCPromMonitor(
- "gitaly", replicateRepositoryFullMethod,
- queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
- ),
+ return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) {
+ acquiringSecondsMetric := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "acquiring_seconds",
+ Help: "Histogram of time calls are rate limited (in seconds)",
+ Buckets: cfg.Prometheus.GRPCLatencyBuckets,
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ inProgressMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "in_progress",
+ Help: "Gauge of number of concurrent in-progress calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
+ )
+ queuedMetric := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "concurrency_limiting",
+ Name: "queued",
+ Help: "Gauge of number of queued calls",
+ },
+ []string{"system", "grpc_service", "grpc_method"},
)
- }
- middleware.methodLimiters = result
+ middleware.collect = func(metrics chan<- prometheus.Metric) {
+ acquiringSecondsMetric.Collect(metrics)
+ inProgressMetric.Collect(metrics)
+ queuedMetric.Collect(metrics)
+ }
+
+ result := make(map[string]limiter.Limiter)
+ for _, concurrency := range cfg.Concurrency {
+ concurrency := concurrency
+
+ result[concurrency.RPC] = limiter.NewConcurrencyLimiter(
+ perRPCLimits[concurrency.RPC],
+ concurrency.MaxQueueSize,
+ concurrency.MaxQueueWait.Duration(),
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", concurrency.RPC,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ // Set default for ReplicateRepository.
+ replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository"
+ if _, ok := result[replicateRepositoryFullMethod]; !ok {
+ result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter(
+ limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}),
+ 0,
+ 0,
+ limiter.NewPerRPCPromMonitor(
+ "gitaly", replicateRepositoryFullMethod,
+ queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric,
+ ),
+ )
+ }
+
+ middleware.methodLimiters = result
+ }
}
// WithRateLimiters sets up a middleware with limiters that limit requests
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index 35eaba735..7516116fb 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -31,6 +31,35 @@ func fixedLockKey(ctx context.Context) string {
return "fixed-id"
}
+func TestWithConcurrencyLimiters(t *testing.T) {
+ t.Parallel()
+
+ cfg := config.Cfg{
+ Concurrency: []config.Concurrency{
+ {
+ RPC: "/grpc.testing.TestService/UnaryCall",
+ MaxPerRepo: 1,
+ },
+ {
+ RPC: "/grpc.testing.TestService/FullDuplexCall",
+ MaxPerRepo: 99,
+ },
+ },
+ }
+ limits, _ := limithandler.WithConcurrencyLimiters(cfg)
+ require.Equal(t, 2, len(limits))
+
+ limit := limits["/grpc.testing.TestService/UnaryCall"]
+ require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name())
+ require.Equal(t, limiter.AdaptiveSetting{Initial: 1}, limit.Setting())
+ require.Equal(t, 1, limit.Current())
+
+ limit = limits["/grpc.testing.TestService/FullDuplexCall"]
+ require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name())
+ require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting())
+ require.Equal(t, 99, limit.Current())
+}
+
func TestUnaryLimitHandler(t *testing.T) {
t.Parallel()
@@ -47,7 +76,8 @@ func TestUnaryLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
@@ -103,7 +133,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
ctx := testhelper.Context(t)
t.Run("simple timeout", func(t *testing.T) {
- lh := limithandler.New(config.Cfg{
+ cfg := config.Cfg{
Concurrency: []config.Concurrency{
{
RPC: "/grpc.testing.TestService/UnaryCall",
@@ -112,7 +142,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxQueueWait: duration.Duration(time.Millisecond),
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -155,7 +187,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
})
t.Run("unlimited queueing", func(t *testing.T) {
- lh := limithandler.New(config.Cfg{
+ cfg := config.Cfg{
Concurrency: []config.Concurrency{
// Due to a bug queueing wait times used to leak into subsequent
// concurrency configuration in case they didn't explicitly set up
@@ -173,7 +205,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) {
MaxPerRepo: 1,
},
},
- }, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ }
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
s := &queueTestServer{
server: server{
@@ -438,7 +472,8 @@ func TestStreamLimitHandler(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -488,7 +523,8 @@ func TestStreamLimitHandler_error(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.StreamInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor))
defer srv.Stop()
@@ -608,7 +644,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) {
},
}
- lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters)
interceptor := lh.UnaryInterceptor()
srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor))
defer srv.Stop()
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index e7b2659e7..770fdc74c 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -333,7 +333,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
}
if gsd.limitHandler == nil {
- gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
+ _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)
}
if gsd.repositoryCounter == nil {