diff options
author | James Liu <jliu@gitlab.com> | 2023-09-28 04:37:21 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-09-28 04:37:21 +0300 |
commit | 185cab4d02640e2e312660273da5c071ae4d8f52 (patch) | |
tree | d318f703fdf83c63f5df4f6556946de9b98067bf | |
parent | c479c671b6918b4665e7735183781db187f7fd05 (diff) | |
parent | c5e2a7ce565aff254cb3d9a82240eb923626427c (diff) |
Merge branch 'qmnguyen0711/integrate-adaptive-calculator-v3' into 'master'
limiter: Integrate adaptive limit to pack-objects limiter
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6411
Merged-by: James Liu <jliu@gitlab.com>
Approved-by: James Liu <jliu@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: James Liu <jliu@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/cli/gitaly/serve.go | 47 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 17 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 54 | ||||
-rw-r--r-- | internal/limiter/adaptive_calculator.go | 5 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit.go | 5 | ||||
-rw-r--r-- | internal/limiter/adaptive_limit_test.go | 7 | ||||
-rw-r--r-- | internal/limiter/concurrency_limiter.go | 21 |
7 files changed, 142 insertions, 14 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 050d382b9..66d8cbeb3 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -40,6 +40,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter/watchers" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/internal/tempdir" @@ -295,22 +296,36 @@ func run(cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("disk cache walkers: %w", err) } - // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses - // MaxConcurrency config. - packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ - Initial: cfg.PackObjectsLimiting.MaxConcurrency, - }) + adaptiveLimits := []limiter.AdaptiveLimiter{} + concurrencyLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters, ) + prometheus.MustRegister(concurrencyLimitHandler) rateLimitHandler := limithandler.New( cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithRateLimiters(ctx), ) + prometheus.MustRegister(rateLimitHandler) + + var packObjectLimit *limiter.AdaptiveLimit + if cfg.PackObjectsLimiting.Adaptive { + packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ + Initial: cfg.PackObjectsLimiting.InitialLimit, + Max: cfg.PackObjectsLimiting.MaxLimit, + Min: cfg.PackObjectsLimiting.MinLimit, + BackoffFactor: limiter.DefaultBackoffFactor, + }) + adaptiveLimits = append(adaptiveLimits, packObjectLimit) + } else { + packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{ + Initial: cfg.PackObjectsLimiting.MaxConcurrency, + }) + } packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor( cfg.Prometheus.GRPCLatencyBuckets, @@ -321,10 +336,28 @@ func run(cfg config.Cfg, logger log.Logger) error { cfg.PackObjectsLimiting.MaxQueueWait.Duration(), packObjectsMonitor, ) - - prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler) prometheus.MustRegister(packObjectsMonitor) + // Enable the adaptive calculator only if there is any limit needed to be adaptive. + if len(adaptiveLimits) > 0 { + adaptiveCalculator := limiter.NewAdaptiveCalculator( + limiter.DefaultCalibrateFrequency, + logger, + adaptiveLimits, + []limiter.ResourceWatcher{ + watchers.NewCgroupCPUWatcher(cgroupMgr), + watchers.NewCgroupMemoryWatcher(cgroupMgr), + }, + ) + prometheus.MustRegister(adaptiveCalculator) + + stop, err := adaptiveCalculator.Start(ctx) + if err != nil { + logger.WithError(err).Warn("error starting adaptive limiter calculator") + } + defer stop() + } + gitalyServerFactory := server.NewGitalyServerFactory( cfg, logger, diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index a938b7c0c..6ea9177bf 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -461,8 +461,18 @@ type RateLimiting struct { // Requests that come in after the maximum number of concurrent pack objects // processes have been reached will wait. type PackObjectsLimiting struct { - // MaxConcurrency is the maximum number of concurrent pack objects processes - // for a given key. + // Adaptive determines the behavior of the concurrency limit. If set to true, the concurrency limit is dynamic + // and starts at InitialLimit, then adjusts within the range [MinLimit, MaxLimit] based on current resource + // usage. If set to false, the concurrency limit is static and is set to MaxConcurrency. + Adaptive bool `toml:"adaptive,omitempty" json:"adaptive,omitempty"` + // InitialLimit is the concurrency limit to start with. + InitialLimit int `toml:"initial_limit,omitempty" json:"initial_limit,omitempty"` + // MaxLimit is the minimum adaptive concurrency limit. + MaxLimit int `toml:"max_limit,omitempty" json:"max_limit,omitempty"` + // MinLimit is the mini adaptive concurrency limit. + MinLimit int `toml:"min_limit,omitempty" json:"min_limit,omitempty"` + // MaxConcurrency is the static maximum number of concurrent pack objects processes for a given key. This config + // is used only if Adaptive is false. MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` // MaxQueueWait is the maximum time a request can remain in the concurrency queue // waiting to be picked up by Gitaly. @@ -477,6 +487,9 @@ func (pol PackObjectsLimiting) Validate() error { Append(cfgerror.Comparable(pol.MaxConcurrency).GreaterOrEqual(0), "max_concurrency"). Append(cfgerror.Comparable(pol.MaxQueueLength).GreaterOrEqual(0), "max_queue_length"). Append(cfgerror.Comparable(pol.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait"). + Append(cfgerror.Comparable(pol.MinLimit).GreaterOrEqual(0), "min_limit"). + Append(cfgerror.Comparable(pol.MaxLimit).GreaterOrEqual(pol.InitialLimit), "max_limit"). + Append(cfgerror.Comparable(pol.InitialLimit).GreaterOrEqual(pol.MinLimit), "initial_limit"). AsError() } diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index beccede6f..1ec917e64 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1727,6 +1727,60 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { PackObjectsLimiting{MaxConcurrency: -1}.Validate(), ) + require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}.Validate()) + require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 0, MaxLimit: 100}.Validate()) + require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 100, MinLimit: 0, MaxLimit: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), + "initial_limit", + ), + }, + PackObjectsLimiting{Adaptive: true, InitialLimit: -1, MinLimit: 0, MaxLimit: 100}.Validate(), + ) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: 10 is not greater than or equal to 11", cfgerror.ErrNotInRange), + "initial_limit", + ), + }, + PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}.Validate(), + ) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: 3 is not greater than or equal to 10", cfgerror.ErrNotInRange), + "max_limit", + ), + }, + PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}.Validate(), + ) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), + "min_limit", + ), + }, + PackObjectsLimiting{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}.Validate(), + ) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1 is not greater than or equal to 10", cfgerror.ErrNotInRange), + "max_limit", + ), + }, + PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}.Validate(), + ) + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate()) require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate()) require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate()) diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go index 8eff16829..f990a25b3 100644 --- a/internal/limiter/adaptive_calculator.go +++ b/internal/limiter/adaptive_calculator.go @@ -17,6 +17,11 @@ const ( // MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers. // When this threshold is reached, a timeout polling is treated as a backoff event. MaximumWatcherTimeout = 5 + // DefaultCalibrateFrequency is the default time period between two calibrations. + DefaultCalibrateFrequency = 30 * time.Second + // DefaultBackoffFactor is the default recommended backoff factor when the concurrency decreases. By default, + // the factor is 0.5, meaning the limit is cut off by half when a backoff event occurs. + DefaultBackoffFactor = 0.5 ) // BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go index 71eddc389..320fff403 100644 --- a/internal/limiter/adaptive_limit.go +++ b/internal/limiter/adaptive_limit.go @@ -58,6 +58,11 @@ func (l *AdaptiveLimit) Current() int { return l.current } +// Initial returns the initial limit. +func (l *AdaptiveLimit) Initial() int { + return l.setting.Initial +} + // Update adjusts the current limit value and executes all registered update hooks. func (l *AdaptiveLimit) Update(val int) { l.Lock() diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go index f6c67f092..9ed9d0ba0 100644 --- a/internal/limiter/adaptive_limit_test.go +++ b/internal/limiter/adaptive_limit_test.go @@ -19,6 +19,13 @@ func TestAdaptiveLimit_New(t *testing.T) { limit := NewAdaptiveLimit("testLimit", setting) require.Equal(t, limit.Name(), "testLimit") require.Equal(t, limit.Current(), 5) + require.Equal(t, limit.Initial(), 5) + require.Equal(t, limit.Setting(), setting) + + limit.Update(10) + require.Equal(t, limit.Name(), "testLimit") + require.Equal(t, limit.Current(), 10) + require.Equal(t, limit.Initial(), 5) require.Equal(t, limit.Setting(), setting) } diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 8cd0d641e..6e0b80007 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -247,7 +247,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li ) defer span.Finish() - if c.currentLimit() <= 0 { + if c.currentLimit(ctx) <= 0 { return f() } @@ -306,14 +306,14 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(ctx context.Context, limitingKe // function we add the concurrency tokens to the number of available token. var queueTokens semaphorer if c.maxQueueLength > 0 { - queueTokens = c.createSemaphore(ctx, uint(c.currentLimit()+c.maxQueueLength)) + queueTokens = c.createSemaphore(ctx, uint(c.currentLimit(ctx)+c.maxQueueLength)) } c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{ monitor: c.monitor, maxQueueWait: c.maxQueueWait, setWaitTimeoutContext: c.SetWaitTimeoutContext, - concurrencyTokens: c.createSemaphore(ctx, uint(c.currentLimit())), + concurrencyTokens: c.createSemaphore(ctx, uint(c.currentLimit(ctx))), queueTokens: queueTokens, } } @@ -352,8 +352,19 @@ func (c *ConcurrencyLimiter) countSemaphores() int { return len(c.limitsByKey) } -func (c *ConcurrencyLimiter) currentLimit() int { - return c.limit.Current() +func (c *ConcurrencyLimiter) currentLimit(ctx context.Context) int { + // When `gitaly_use_resizable_semaphore_in_concurrency_limiter` flag is enabled, the resizable semaphore should + // use the current value of the adaptive limit. This limit is constantly calibrated by the adaptive calculator + // if the adaptiveness is enabled. In contrast, when the flag is disabled, the static semaphore should use the + // initial limit instead of the floating current limit. + // This situation is temporary during the rollout phase where the adaptive limiting is experimental. The + // aforementioned feature flag is used as an escape hatch to fallback to use static limiting if something goes + // wrong. When the feature enters a mature state, this feature flag and the static semaphore will be removed. + // The resizable semaphore can handle both static and adaptive limiting. + if featureflag.UseResizableSemaphoreInConcurrencyLimiter.IsEnabled(ctx) { + return c.limit.Current() + } + return c.limit.Initial() } func (c *ConcurrencyLimiter) createSemaphore(ctx context.Context, size uint) semaphorer { |