diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-09-28 06:40:56 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-09-28 06:40:56 +0300 |
commit | b87e16084b871680a4338530e6efa315b18de94a (patch) | |
tree | c94293d8eae2e67aa37b10856642bfa5c908e95b /internal | |
parent | e71de95f649e3f260aa3b775365a2abf9250d05f (diff) |
limiter: Let limithandler.WithConcurrencyLimiters return created limits
limithandler.WithConcurrencyLimiters returns a function to setup per-RPC
concurrency limiter. This function is called when Gitaly process creates
a limiter interceptor. It creates a lot of things and notably adaptive
limit structs. Currenty, those limits are not connected to the adaptive
calculator. In some later commits, we need to pull those limits out and
plug into the calculator so that the calculator can adjust those limits.
This commit adjusts the signature of that function and let it return the
configured limits.
Diffstat (limited to 'internal')
-rw-r--r-- | internal/cli/gitaly/serve.go | 9 | ||||
-rw-r--r-- | internal/gitaly/server/auth_test.go | 6 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 133 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 53 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 3 |
5 files changed, 127 insertions, 77 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 66d8cbeb3..e436c7d48 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -298,12 +298,13 @@ func run(cfg config.Cfg, logger log.Logger) error { adaptiveLimits := []limiter.AdaptiveLimiter{} - concurrencyLimitHandler := limithandler.New( + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + perRPCLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, - limithandler.WithConcurrencyLimiters, + setupPerRPCConcurrencyLimiters, ) - prometheus.MustRegister(concurrencyLimitHandler) + prometheus.MustRegister(perRPCLimitHandler) rateLimitHandler := limithandler.New( cfg, @@ -363,7 +364,7 @@ func run(cfg config.Cfg, logger log.Logger) error { logger, registry, diskCache, - []*limithandler.LimiterMiddleware{concurrencyLimitHandler, rateLimitHandler}, + []*limithandler.LimiterMiddleware{perRPCLimitHandler, rateLimitHandler}, ) defer gitalyServerFactory.Stop() diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 7ab5aad0e..61f289b73 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -199,7 +199,8 @@ func runServer(t *testing.T, cfg config.Cfg) string { catfileCache := catfile.NewCache(cfg) t.Cleanup(catfileCache.Stop) diskCache := cache.New(cfg, locator, logger) - limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, catfileCache) srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}).New(false) @@ -237,12 +238,13 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { testhelper.MustClose(t, conns) }) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) srv, err := NewGitalyServerFactory( cfg, testhelper.SharedLogger(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), - []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)}, + []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)}, ).New(true) require.NoError(t, err) diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index efc622eb6..c36c5b614 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -2,6 +2,7 @@ package limithandler import ( "context" + "fmt" "strings" "time" @@ -165,72 +166,80 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { // WithConcurrencyLimiters sets up middleware to limit the concurrency of // requests based on RPC and repository -func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware) { - acquiringSecondsMetric := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "acquiring_seconds", - Help: "Histogram of time calls are rate limited (in seconds)", - Buckets: cfg.Prometheus.GRPCLatencyBuckets, - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - inProgressMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "in_progress", - Help: "Gauge of number of concurrent in-progress calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - queuedMetric := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "gitaly", - Subsystem: "concurrency_limiting", - Name: "queued", - Help: "Gauge of number of queued calls", - }, - []string{"system", "grpc_service", "grpc_method"}, - ) - - middleware.collect = func(metrics chan<- prometheus.Metric) { - acquiringSecondsMetric.Collect(metrics) - inProgressMetric.Collect(metrics) - queuedMetric.Collect(metrics) - } - - result := make(map[string]limiter.Limiter) - for _, limit := range cfg.Concurrency { - limit := limit - - result[limit.RPC] = limiter.NewConcurrencyLimiter( - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: limit.MaxPerRepo}), - limit.MaxQueueSize, - limit.MaxQueueWait.Duration(), - limiter.NewPerRPCPromMonitor( - "gitaly", limit.RPC, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), +func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) { + perRPCLimits := map[string]*limiter.AdaptiveLimit{} + for _, concurrency := range cfg.Concurrency { + perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit( + fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo}, ) } - - // Set default for ReplicateRepository. - replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" - if _, ok := result[replicateRepositoryFullMethod]; !ok { - result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( - limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), - 0, - 0, - limiter.NewPerRPCPromMonitor( - "gitaly", replicateRepositoryFullMethod, - queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, - ), + return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) { + acquiringSecondsMetric := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: cfg.Prometheus.GRPCLatencyBuckets, + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + inProgressMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + []string{"system", "grpc_service", "grpc_method"}, + ) + queuedMetric := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "concurrency_limiting", + Name: "queued", + Help: "Gauge of number of queued calls", + }, + []string{"system", "grpc_service", "grpc_method"}, ) - } - middleware.methodLimiters = result + middleware.collect = func(metrics chan<- prometheus.Metric) { + acquiringSecondsMetric.Collect(metrics) + inProgressMetric.Collect(metrics) + queuedMetric.Collect(metrics) + } + + result := make(map[string]limiter.Limiter) + for _, concurrency := range cfg.Concurrency { + concurrency := concurrency + + result[concurrency.RPC] = limiter.NewConcurrencyLimiter( + perRPCLimits[concurrency.RPC], + concurrency.MaxQueueSize, + concurrency.MaxQueueWait.Duration(), + limiter.NewPerRPCPromMonitor( + "gitaly", concurrency.RPC, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + // Set default for ReplicateRepository. + replicateRepositoryFullMethod := "/gitaly.RepositoryService/ReplicateRepository" + if _, ok := result[replicateRepositoryFullMethod]; !ok { + result[replicateRepositoryFullMethod] = limiter.NewConcurrencyLimiter( + limiter.NewAdaptiveLimit("staticLimit", limiter.AdaptiveSetting{Initial: 1}), + 0, + 0, + limiter.NewPerRPCPromMonitor( + "gitaly", replicateRepositoryFullMethod, + queuedMetric, inProgressMetric, acquiringSecondsMetric, middleware.requestsDroppedMetric, + ), + ) + } + + middleware.methodLimiters = result + } } // WithRateLimiters sets up a middleware with limiters that limit requests diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 35eaba735..7516116fb 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -31,6 +31,35 @@ func fixedLockKey(ctx context.Context) string { return "fixed-id" } +func TestWithConcurrencyLimiters(t *testing.T) { + t.Parallel() + + cfg := config.Cfg{ + Concurrency: []config.Concurrency{ + { + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 1, + }, + { + RPC: "/grpc.testing.TestService/FullDuplexCall", + MaxPerRepo: 99, + }, + }, + } + limits, _ := limithandler.WithConcurrencyLimiters(cfg) + require.Equal(t, 2, len(limits)) + + limit := limits["/grpc.testing.TestService/UnaryCall"] + require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name()) + require.Equal(t, limiter.AdaptiveSetting{Initial: 1}, limit.Setting()) + require.Equal(t, 1, limit.Current()) + + limit = limits["/grpc.testing.TestService/FullDuplexCall"] + require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name()) + require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting()) + require.Equal(t, 99, limit.Current()) +} + func TestUnaryLimitHandler(t *testing.T) { t.Parallel() @@ -47,7 +76,8 @@ func TestUnaryLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() @@ -103,7 +133,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { ctx := testhelper.Context(t) t.Run("simple timeout", func(t *testing.T) { - lh := limithandler.New(config.Cfg{ + cfg := config.Cfg{ Concurrency: []config.Concurrency{ { RPC: "/grpc.testing.TestService/UnaryCall", @@ -112,7 +142,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxQueueWait: duration.Duration(time.Millisecond), }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + } + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -155,7 +187,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { }) t.Run("unlimited queueing", func(t *testing.T) { - lh := limithandler.New(config.Cfg{ + cfg := config.Cfg{ Concurrency: []config.Concurrency{ // Due to a bug queueing wait times used to leak into subsequent // concurrency configuration in case they didn't explicitly set up @@ -173,7 +205,9 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { MaxPerRepo: 1, }, }, - }, fixedLockKey, limithandler.WithConcurrencyLimiters) + } + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) s := &queueTestServer{ server: server{ @@ -438,7 +472,8 @@ func TestStreamLimitHandler(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -488,7 +523,8 @@ func TestStreamLimitHandler_error(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.StreamInterceptor() srv, serverSocketPath := runServer(t, s, grpc.StreamInterceptor(interceptor)) defer srv.Stop() @@ -608,7 +644,8 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { }, } - lh := limithandler.New(cfg, fixedLockKey, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + lh := limithandler.New(cfg, fixedLockKey, setupPerRPCConcurrencyLimiters) interceptor := lh.UnaryInterceptor() srv, serverSocketPath := runServer(t, s, grpc.UnaryInterceptor(interceptor)) defer srv.Stop() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index e7b2659e7..770fdc74c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -333,7 +333,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } if gsd.limitHandler == nil { - gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) + _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) } if gsd.repositoryCounter == nil { |