Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-09-27 05:06:08 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-10-04 08:06:04 +0300
commitdee9cee076950261d9eebd054d3abfdd7e0ded1d (patch)
treed903f6f255960f98b4b8fd2cba9dc0116fec64c2
parent27b44739c0cc64c24f04697777d1d857d98dd6ae (diff)
limiter: Integrate adaptive limit to per-rpc limiter
This commit adds new adaptive configs to the `concurrency` settings, including `Adaptive`, `MinLimit`, `MaxLimit`, and `InitialLimit`. Since we configure distinct limits for each individual RPC, each limit will float separately. That said, they will share the same calculator and be adjusted at the same time during calibration event. As the limiters are configured at boot time, we cannot use a feature flag. Instead, we add the new settings to targeted nodes. We can always rollback to the use of static limits by disabling `gitaly_use_resizable_semaphore_in_concurrency_limiter` feature flag.
-rw-r--r--internal/cli/gitaly/serve.go9
-rw-r--r--internal/grpc/middleware/limithandler/middleware.go16
-rw-r--r--internal/grpc/middleware/limithandler/middleware_test.go14
3 files changed, 34 insertions, 5 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 2a57189d8..165bc0eaa 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -296,9 +296,16 @@ func run(cfg config.Cfg, logger log.Logger) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
+ // List of tracking adaptive limits. They will be calibrated by the adaptive calculator
adaptiveLimits := []limiter.AdaptiveLimiter{}
- _, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ perRPCLimits, setupPerRPCConcurrencyLimiters := limithandler.WithConcurrencyLimiters(cfg)
+ for _, concurrency := range cfg.Concurrency {
+ // Connect adaptive limits to the adaptive calculator
+ if concurrency.Adaptive {
+ adaptiveLimits = append(adaptiveLimits, perRPCLimits[concurrency.RPC])
+ }
+ }
perRPCLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
diff --git a/internal/grpc/middleware/limithandler/middleware.go b/internal/grpc/middleware/limithandler/middleware.go
index c36c5b614..50f106aaf 100644
--- a/internal/grpc/middleware/limithandler/middleware.go
+++ b/internal/grpc/middleware/limithandler/middleware.go
@@ -169,9 +169,19 @@ func (w *wrappedStream) RecvMsg(m interface{}) error {
func WithConcurrencyLimiters(cfg config.Cfg) (map[string]*limiter.AdaptiveLimit, SetupFunc) {
perRPCLimits := map[string]*limiter.AdaptiveLimit{}
for _, concurrency := range cfg.Concurrency {
- perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(
- fmt.Sprintf("perRPC%s", concurrency.RPC), limiter.AdaptiveSetting{Initial: concurrency.MaxPerRepo},
- )
+ limitName := fmt.Sprintf("perRPC%s", concurrency.RPC)
+ if concurrency.Adaptive {
+ perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(limitName, limiter.AdaptiveSetting{
+ Initial: concurrency.InitialLimit,
+ Max: concurrency.MaxLimit,
+ Min: concurrency.MinLimit,
+ BackoffFactor: limiter.DefaultBackoffFactor,
+ })
+ } else {
+ perRPCLimits[concurrency.RPC] = limiter.NewAdaptiveLimit(limitName, limiter.AdaptiveSetting{
+ Initial: concurrency.MaxPerRepo,
+ })
+ }
}
return perRPCLimits, func(cfg config.Cfg, middleware *LimiterMiddleware) {
acquiringSecondsMetric := prometheus.NewHistogramVec(
diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go
index 69a720bcb..1d9c5cf34 100644
--- a/internal/grpc/middleware/limithandler/middleware_test.go
+++ b/internal/grpc/middleware/limithandler/middleware_test.go
@@ -44,10 +44,17 @@ func TestWithConcurrencyLimiters(t *testing.T) {
RPC: "/grpc.testing.TestService/FullDuplexCall",
MaxPerRepo: 99,
},
+ {
+ RPC: "/grpc.testing.TestService/AnotherUnaryCall",
+ Adaptive: true,
+ MinLimit: 5,
+ InitialLimit: 10,
+ MaxLimit: 15,
+ },
},
}
limits, _ := limithandler.WithConcurrencyLimiters(cfg)
- require.Equal(t, 2, len(limits))
+ require.Equal(t, 3, len(limits))
limit := limits["/grpc.testing.TestService/UnaryCall"]
require.Equal(t, "perRPC/grpc.testing.TestService/UnaryCall", limit.Name())
@@ -58,6 +65,11 @@ func TestWithConcurrencyLimiters(t *testing.T) {
require.Equal(t, "perRPC/grpc.testing.TestService/FullDuplexCall", limit.Name())
require.Equal(t, limiter.AdaptiveSetting{Initial: 99}, limit.Setting())
require.Equal(t, 99, limit.Current())
+
+ limit = limits["/grpc.testing.TestService/AnotherUnaryCall"]
+ require.Equal(t, "perRPC/grpc.testing.TestService/AnotherUnaryCall", limit.Name())
+ require.Equal(t, limiter.AdaptiveSetting{Initial: 10, Min: 5, Max: 15, BackoffFactor: limiter.DefaultBackoffFactor}, limit.Setting())
+ require.Equal(t, 10, limit.Current())
}
func TestUnaryLimitHandler(t *testing.T) {