Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-09-28 04:37:21 +0300
committerJames Liu <jliu@gitlab.com>2023-09-28 04:37:21 +0300
commit185cab4d02640e2e312660273da5c071ae4d8f52 (patch)
treed318f703fdf83c63f5df4f6556946de9b98067bf
parentc479c671b6918b4665e7735183781db187f7fd05 (diff)
parentc5e2a7ce565aff254cb3d9a82240eb923626427c (diff)
Merge branch 'qmnguyen0711/integrate-adaptive-calculator-v3' into 'master'
limiter: Integrate adaptive limit to pack-objects limiter See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6411 Merged-by: James Liu <jliu@gitlab.com> Approved-by: James Liu <jliu@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: James Liu <jliu@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/cli/gitaly/serve.go47
-rw-r--r--internal/gitaly/config/config.go17
-rw-r--r--internal/gitaly/config/config_test.go54
-rw-r--r--internal/limiter/adaptive_calculator.go5
-rw-r--r--internal/limiter/adaptive_limit.go5
-rw-r--r--internal/limiter/adaptive_limit_test.go7
-rw-r--r--internal/limiter/concurrency_limiter.go21
7 files changed, 142 insertions, 14 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 050d382b9..66d8cbeb3 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -40,6 +40,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/limiter/watchers"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v16/internal/tempdir"
@@ -295,22 +296,36 @@ func run(cfg config.Cfg, logger log.Logger) error {
return fmt.Errorf("disk cache walkers: %w", err)
}
- // The pack-objects limit below is static at this stage. It's always equal to the initial limit, which uses
- // MaxConcurrency config.
- packObjectLimit := limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
- Initial: cfg.PackObjectsLimiting.MaxConcurrency,
- })
+ adaptiveLimits := []limiter.AdaptiveLimiter{}
+
concurrencyLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
limithandler.WithConcurrencyLimiters,
)
+ prometheus.MustRegister(concurrencyLimitHandler)
rateLimitHandler := limithandler.New(
cfg,
limithandler.LimitConcurrencyByRepo,
limithandler.WithRateLimiters(ctx),
)
+ prometheus.MustRegister(rateLimitHandler)
+
+ var packObjectLimit *limiter.AdaptiveLimit
+ if cfg.PackObjectsLimiting.Adaptive {
+ packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
+ Initial: cfg.PackObjectsLimiting.InitialLimit,
+ Max: cfg.PackObjectsLimiting.MaxLimit,
+ Min: cfg.PackObjectsLimiting.MinLimit,
+ BackoffFactor: limiter.DefaultBackoffFactor,
+ })
+ adaptiveLimits = append(adaptiveLimits, packObjectLimit)
+ } else {
+ packObjectLimit = limiter.NewAdaptiveLimit("packObjects", limiter.AdaptiveSetting{
+ Initial: cfg.PackObjectsLimiting.MaxConcurrency,
+ })
+ }
packObjectsMonitor := limiter.NewPackObjectsConcurrencyMonitor(
cfg.Prometheus.GRPCLatencyBuckets,
@@ -321,10 +336,28 @@ func run(cfg config.Cfg, logger log.Logger) error {
cfg.PackObjectsLimiting.MaxQueueWait.Duration(),
packObjectsMonitor,
)
-
- prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler)
prometheus.MustRegister(packObjectsMonitor)
+ // Enable the adaptive calculator only if there is any limit needed to be adaptive.
+ if len(adaptiveLimits) > 0 {
+ adaptiveCalculator := limiter.NewAdaptiveCalculator(
+ limiter.DefaultCalibrateFrequency,
+ logger,
+ adaptiveLimits,
+ []limiter.ResourceWatcher{
+ watchers.NewCgroupCPUWatcher(cgroupMgr),
+ watchers.NewCgroupMemoryWatcher(cgroupMgr),
+ },
+ )
+ prometheus.MustRegister(adaptiveCalculator)
+
+ stop, err := adaptiveCalculator.Start(ctx)
+ if err != nil {
+ logger.WithError(err).Warn("error starting adaptive limiter calculator")
+ }
+ defer stop()
+ }
+
gitalyServerFactory := server.NewGitalyServerFactory(
cfg,
logger,
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index a938b7c0c..6ea9177bf 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -461,8 +461,18 @@ type RateLimiting struct {
// Requests that come in after the maximum number of concurrent pack objects
// processes have been reached will wait.
type PackObjectsLimiting struct {
- // MaxConcurrency is the maximum number of concurrent pack objects processes
- // for a given key.
+ // Adaptive determines the behavior of the concurrency limit. If set to true, the concurrency limit is dynamic
+ // and starts at InitialLimit, then adjusts within the range [MinLimit, MaxLimit] based on current resource
+ // usage. If set to false, the concurrency limit is static and is set to MaxConcurrency.
+ Adaptive bool `toml:"adaptive,omitempty" json:"adaptive,omitempty"`
+ // InitialLimit is the concurrency limit to start with.
+ InitialLimit int `toml:"initial_limit,omitempty" json:"initial_limit,omitempty"`
+ // MaxLimit is the minimum adaptive concurrency limit.
+ MaxLimit int `toml:"max_limit,omitempty" json:"max_limit,omitempty"`
+ // MinLimit is the mini adaptive concurrency limit.
+ MinLimit int `toml:"min_limit,omitempty" json:"min_limit,omitempty"`
+ // MaxConcurrency is the static maximum number of concurrent pack objects processes for a given key. This config
+ // is used only if Adaptive is false.
MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
// MaxQueueWait is the maximum time a request can remain in the concurrency queue
// waiting to be picked up by Gitaly.
@@ -477,6 +487,9 @@ func (pol PackObjectsLimiting) Validate() error {
Append(cfgerror.Comparable(pol.MaxConcurrency).GreaterOrEqual(0), "max_concurrency").
Append(cfgerror.Comparable(pol.MaxQueueLength).GreaterOrEqual(0), "max_queue_length").
Append(cfgerror.Comparable(pol.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait").
+ Append(cfgerror.Comparable(pol.MinLimit).GreaterOrEqual(0), "min_limit").
+ Append(cfgerror.Comparable(pol.MaxLimit).GreaterOrEqual(pol.InitialLimit), "max_limit").
+ Append(cfgerror.Comparable(pol.InitialLimit).GreaterOrEqual(pol.MinLimit), "initial_limit").
AsError()
}
diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go
index beccede6f..1ec917e64 100644
--- a/internal/gitaly/config/config_test.go
+++ b/internal/gitaly/config/config_test.go
@@ -1727,6 +1727,60 @@ func TestPackObjectsLimiting_Validate(t *testing.T) {
PackObjectsLimiting{MaxConcurrency: -1}.Validate(),
)
+ require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}.Validate())
+ require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 0, MaxLimit: 100}.Validate())
+ require.NoError(t, PackObjectsLimiting{Adaptive: true, InitialLimit: 100, MinLimit: 0, MaxLimit: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange),
+ "initial_limit",
+ ),
+ },
+ PackObjectsLimiting{Adaptive: true, InitialLimit: -1, MinLimit: 0, MaxLimit: 100}.Validate(),
+ )
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: 10 is not greater than or equal to 11", cfgerror.ErrNotInRange),
+ "initial_limit",
+ ),
+ },
+ PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}.Validate(),
+ )
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: 3 is not greater than or equal to 10", cfgerror.ErrNotInRange),
+ "max_limit",
+ ),
+ },
+ PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}.Validate(),
+ )
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange),
+ "min_limit",
+ ),
+ },
+ PackObjectsLimiting{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}.Validate(),
+ )
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1 is not greater than or equal to 10", cfgerror.ErrNotInRange),
+ "max_limit",
+ ),
+ },
+ PackObjectsLimiting{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}.Validate(),
+ )
+
require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate())
require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate())
require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate())
diff --git a/internal/limiter/adaptive_calculator.go b/internal/limiter/adaptive_calculator.go
index 8eff16829..f990a25b3 100644
--- a/internal/limiter/adaptive_calculator.go
+++ b/internal/limiter/adaptive_calculator.go
@@ -17,6 +17,11 @@ const (
// MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers.
// When this threshold is reached, a timeout polling is treated as a backoff event.
MaximumWatcherTimeout = 5
+ // DefaultCalibrateFrequency is the default time period between two calibrations.
+ DefaultCalibrateFrequency = 30 * time.Second
+ // DefaultBackoffFactor is the default recommended backoff factor when the concurrency decreases. By default,
+ // the factor is 0.5, meaning the limit is cut off by half when a backoff event occurs.
+ DefaultBackoffFactor = 0.5
)
// BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the
diff --git a/internal/limiter/adaptive_limit.go b/internal/limiter/adaptive_limit.go
index 71eddc389..320fff403 100644
--- a/internal/limiter/adaptive_limit.go
+++ b/internal/limiter/adaptive_limit.go
@@ -58,6 +58,11 @@ func (l *AdaptiveLimit) Current() int {
return l.current
}
+// Initial returns the initial limit.
+func (l *AdaptiveLimit) Initial() int {
+ return l.setting.Initial
+}
+
// Update adjusts the current limit value and executes all registered update hooks.
func (l *AdaptiveLimit) Update(val int) {
l.Lock()
diff --git a/internal/limiter/adaptive_limit_test.go b/internal/limiter/adaptive_limit_test.go
index f6c67f092..9ed9d0ba0 100644
--- a/internal/limiter/adaptive_limit_test.go
+++ b/internal/limiter/adaptive_limit_test.go
@@ -19,6 +19,13 @@ func TestAdaptiveLimit_New(t *testing.T) {
limit := NewAdaptiveLimit("testLimit", setting)
require.Equal(t, limit.Name(), "testLimit")
require.Equal(t, limit.Current(), 5)
+ require.Equal(t, limit.Initial(), 5)
+ require.Equal(t, limit.Setting(), setting)
+
+ limit.Update(10)
+ require.Equal(t, limit.Name(), "testLimit")
+ require.Equal(t, limit.Current(), 10)
+ require.Equal(t, limit.Initial(), 5)
require.Equal(t, limit.Setting(), setting)
}
diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go
index 8cd0d641e..6e0b80007 100644
--- a/internal/limiter/concurrency_limiter.go
+++ b/internal/limiter/concurrency_limiter.go
@@ -247,7 +247,7 @@ func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f Li
)
defer span.Finish()
- if c.currentLimit() <= 0 {
+ if c.currentLimit(ctx) <= 0 {
return f()
}
@@ -306,14 +306,14 @@ func (c *ConcurrencyLimiter) getConcurrencyLimit(ctx context.Context, limitingKe
// function we add the concurrency tokens to the number of available token.
var queueTokens semaphorer
if c.maxQueueLength > 0 {
- queueTokens = c.createSemaphore(ctx, uint(c.currentLimit()+c.maxQueueLength))
+ queueTokens = c.createSemaphore(ctx, uint(c.currentLimit(ctx)+c.maxQueueLength))
}
c.limitsByKey[limitingKey] = &keyedConcurrencyLimiter{
monitor: c.monitor,
maxQueueWait: c.maxQueueWait,
setWaitTimeoutContext: c.SetWaitTimeoutContext,
- concurrencyTokens: c.createSemaphore(ctx, uint(c.currentLimit())),
+ concurrencyTokens: c.createSemaphore(ctx, uint(c.currentLimit(ctx))),
queueTokens: queueTokens,
}
}
@@ -352,8 +352,19 @@ func (c *ConcurrencyLimiter) countSemaphores() int {
return len(c.limitsByKey)
}
-func (c *ConcurrencyLimiter) currentLimit() int {
- return c.limit.Current()
+func (c *ConcurrencyLimiter) currentLimit(ctx context.Context) int {
+ // When `gitaly_use_resizable_semaphore_in_concurrency_limiter` flag is enabled, the resizable semaphore should
+ // use the current value of the adaptive limit. This limit is constantly calibrated by the adaptive calculator
+ // if the adaptiveness is enabled. In contrast, when the flag is disabled, the static semaphore should use the
+ // initial limit instead of the floating current limit.
+ // This situation is temporary during the rollout phase where the adaptive limiting is experimental. The
+ // aforementioned feature flag is used as an escape hatch to fallback to use static limiting if something goes
+ // wrong. When the feature enters a mature state, this feature flag and the static semaphore will be removed.
+ // The resizable semaphore can handle both static and adaptive limiting.
+ if featureflag.UseResizableSemaphoreInConcurrencyLimiter.IsEnabled(ctx) {
+ return c.limit.Current()
+ }
+ return c.limit.Initial()
}
func (c *ConcurrencyLimiter) createSemaphore(ctx context.Context, size uint) semaphorer {