diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-09-27 05:06:08 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-10-04 08:06:04 +0300 |
commit | dee9cee076950261d9eebd054d3abfdd7e0ded1d (patch) | |
tree | d903f6f255960f98b4b8fd2cba9dc0116fec64c2 | |
parent | 27b44739c0cc64c24f04697777d1d857d98dd6ae (diff) |
limiter: Integrate adaptive limit to per-rpc limiter
This commit adds new adaptive configs to the `concurrency` settings,
including `Adaptive`, `MinLimit`, `MaxLimit`, and `InitialLimit`. Since
we configure distinct limits for each individual RPC, each limit will
float separately. That said, they will share the same calculator and be
adjusted at the same time during calibration event.
As the limiters are configured at boot time, we cannot use a feature
flag. Instead, we add the new settings to targeted nodes. We can always
rollback to the use of static limits by disabling
`gitaly_use_resizable_semaphore_in_concurrency_limiter` feature flag.
-rw-r--r-- | internal/cli/gitaly/serve.go | 9 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware.go | 16 | ||||
-rw-r--r-- | internal/grpc/middleware/limithandler/middleware_test.go | 14 |
3 files changed, 34 insertions, 5 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 2a57189d8..165bc0eaa 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -296,9 +296,16 @@ func run(cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("disk cache walkers: %w", err) } + // List of tracking adaptive limits. They will be calibrated by the adaptive calculator adaptiveLimits := []limiter.AdaptiveLimiter{} - _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + perRPCLimits, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg) + for _, concurrency := range cfg.Concurrency { + // Connect adaptive limits to the adaptive calculator + if concurrency.Adaptive { + adaptiveLimits = append(adaptiveLimits, perRPCLimits[concurrency.RPC]) + } + } perRPCLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go index c36c5b614..50f106aaf 100644 --- a/internal/grpc/middleware/limithandler/middleware.go +++ b/internal/grpc/middleware/limithandler/middleware.go @@ -169,9 +169,19 @@ func (w *wrappedStream) RecvMsg(m interface{}) error { func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) { perRPCLimits := map[string]*limiter.AdaptiveLimit{} for _, concurrency := range cfg.Concurrency { - perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit( - fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo}, - ) + limitName := fmt.Sprintf("perRPC%s", concurrency.RPC) + if concurrency.Adaptive { + perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(limitName, limiter.AdaptiveSetting{ + Initial: concurrency.InitialLimit, + Max: concurrency.MaxLimit, + Min: concurrency.MinLimit, + BackoffFactor: limiter.DefaultBackoffFactor, + }) + } else { + perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(limitName, limiter.AdaptiveSetting{ + Initial: concurrency.MaxPerRepo, + }) + } } return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) { acquiringSecondsMetric := prometheus.NewHistogramVec( diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 69a720bcb..1d9c5cf34 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -44,10 +44,17 @@ func TestWithConcurrencyLimiters(t *testing.T) { RPC: "/grpc.testing.TestService/FullDuplexCall", MaxPerRepo: 99, }, + { + RPC: "/grpc.testing.TestService/AnotherUnaryCall", + Adaptive: true, + MinLimit: 5, + InitialLimit: 10, + MaxLimit: 15, + }, }, } limits, _ := limithandler.WithConcurrencyLimiters(cfg) - require.Equal(t, 2, len(limits)) + require.Equal(t, 3, len(limits)) limit := limits["/grpc.testing.TestService/UnaryCall"] require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name()) @@ -58,6 +65,11 @@ func TestWithConcurrencyLimiters(t *testing.T) { require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name()) require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting()) require.Equal(t, 99, limit.Current()) + + limit = limits["/grpc.testing.TestService/AnotherUnaryCall"] + require.Equal(t, "perRPC/grpc.testing.TestService/AnotherUnaryCall", limit.Name()) + require.Equal(t, limiter.AdaptiveSetting{Initial: 10, Min: 5, Max: 15, BackoffFactor: limiter.DefaultBackoffFactor}, limit.Setting()) + require.Equal(t, 10, limit.Current()) } func TestUnaryLimitHandler(t *testing.T) { |