diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-03-29 10:38:11 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-04-11 11:06:14 +0300 |
commit | ff93231c648cd9511267220d656f27e735148d88 (patch) | |
tree | f3b4e0eff3ffacf42714ab55532bf630123e7b5d | |
parent | 13f08896b308c617c4a4ca585212069808422367 (diff) |
Refator PackObjectsLimiting configs
This commit adds MaxQueueLength field to make it consistent with other
limiters. As the concurrency limiter is new, we don't want to add a
support for such configs to Omnibus/K8s helm chart too soon. As a
result, the configs can be fed from the environment variables.
Besides, this commit moves the default PackObjectsLimiting
configurations to `internal/config/config.go`. That file is the
centralized location to store all Gitaly configurations as well as
default setting.
-rw-r--r-- | internal/cli/gitaly/serve.go | 11 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 44 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 144 |
3 files changed, 183 insertions, 16 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 9f68f722c..37997859d 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -271,16 +271,9 @@ func run(cfg config.Cfg) error { string(cfg.PackObjectsLimiting.Key), cfg.Prometheus.GRPCLatencyBuckets, ) - packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency - if packObjectsConcurrencyLimit == 0 { - // TODO: remove this default setting when we remove the feature - // flags PackObjectsLimitingRepo and PackObjectsLimitingUser - // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413 - packObjectsConcurrencyLimit = 200 - } packObjectsLimiter := limithandler.NewConcurrencyLimiter( - int(packObjectsConcurrencyLimit), - 0, + cfg.PackObjectsLimiting.MaxConcurrency, + cfg.PackObjectsLimiting.MaxQueueLength, func() helper.Ticker { return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration()) }, diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 8d8f5a33e..c920edf81 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -26,6 +26,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/duration" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" ) @@ -407,6 +408,9 @@ const ( // PackObjectsLimitingKeyRepository will limit the pack objects concurrency // by repository PackObjectsLimitingKeyRepository = PackObjectsLimitingKey("repository") + // PackObjectsLimitingKeyRemoteIP will limit the pack objects concurrency + // by RemoteIP + PackObjectsLimitingKeyRemoteIP = PackObjectsLimitingKey("remote_ip") ) // ParsePackObjectsLimitingKey checks if the key is a valid @@ -417,6 +421,8 @@ func ParsePackObjectsLimitingKey(k string) (PackObjectsLimitingKey, error) { return PackObjectsLimitingKeyUser, nil case PackObjectsLimitingKeyRepository: return PackObjectsLimitingKeyRepository, nil + case PackObjectsLimitingKeyRemoteIP: + return PackObjectsLimitingKeyRemoteIP, nil default: return "", fmt.Errorf("unsupported pack objects limiting key: %q", k) } @@ -441,15 +447,19 @@ type PackObjectsLimiting struct { Key PackObjectsLimitingKey `toml:"key,omitempty" json:"key,omitempty"` // MaxConcurrency is the maximum number of concurrent pack objects processes // for a given key. - MaxConcurrency uint `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` + MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` // MaxQueueWait is the maximum time a request can remain in the concurrency queue // waiting to be picked up by Gitaly. MaxQueueWait duration.Duration `toml:"max_queue_wait,omitempty" json:"max_queue_wait,omitempty"` + // MaxQueueLength is the maximum length of the request queue + MaxQueueLength int `toml:"max_queue_length,omitempty" json:"max_queue_length,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (pol PackObjectsLimiting) Validate() error { return cfgerror.New(). + Append(cfgerror.IsPositive(pol.MaxConcurrency), "max_concurrency"). + Append(cfgerror.IsPositive(pol.MaxQueueLength), "max_queue_length"). Append(cfgerror.IsPositive(pol.MaxQueueWait.Duration()), "max_queue_wait"). AsError() } @@ -486,12 +496,40 @@ func defaultPackObjectsCacheConfig() StreamCacheConfig { } } +func defaultPackObjectsLimiting() PackObjectsLimiting { + var maxConcurrency, maxQueueLength int + + // TODO: Injecting environment variables here boosts the time to try out new concurrency + // limiting. When this new limiter is in-place, they should go through official + // configure methods (Omnibus, K8s helm chart, etc.) instead. + if maxConcurrencyFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY", 200); err == nil { + maxConcurrency = maxConcurrencyFromEnv + } + if maxConcurrency == 0 { + // TODO: remove this default setting when we remove the feature + // flags PackObjectsLimitingRepo, PackObjectsLimitingUser, and PackObjectsLimitingRemoteIP + // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413 + maxConcurrency = 200 + } + if maxQueueLengthFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH", 0); err == nil { + maxQueueLength = maxQueueLengthFromEnv + } + + return PackObjectsLimiting{ + Key: "", + MaxConcurrency: maxConcurrency, + MaxQueueLength: maxQueueLength, + MaxQueueWait: 0, // Do not enforce queue wait at this point + } +} + // Load initializes the Config variable from file and the environment. // Environment variables take precedence over the file. func Load(file io.Reader) (Cfg, error) { cfg := Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } if err := toml.NewDecoder(file).Decode(&cfg); err != nil { diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index 9baa2c35a..f130c0d00 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -46,8 +46,9 @@ func TestLoadEmptyConfig(t *testing.T) { require.NoError(t, err) expectedCfg := Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } require.NoError(t, expectedCfg.setDefaults()) @@ -186,8 +187,9 @@ func TestLoadConfigCommand(t *testing.T) { modifyDefaultConfig := func(modify func(cfg *Cfg)) Cfg { cfg := &Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } require.NoError(t, cfg.setDefaults()) modify(cfg) @@ -1619,11 +1621,13 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "repository" max_concurrency = 20 + max_queue_length = 100 max_queue_wait = "10s" `, expectedCfg: PackObjectsLimiting{ Key: PackObjectsLimitingKeyRepository, MaxConcurrency: 20, + MaxQueueLength: 100, MaxQueueWait: duration.Duration(10 * time.Second), }, }, @@ -1632,11 +1636,28 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "user" max_concurrency = 10 + max_queue_length = 100 max_queue_wait = "1m" `, expectedCfg: PackObjectsLimiting{ Key: PackObjectsLimitingKeyUser, MaxConcurrency: 10, + MaxQueueLength: 100, + MaxQueueWait: duration.Duration(1 * time.Minute), + }, + }, + { + desc: "using remote_ip as key", + rawCfg: `[pack_objects_limiting] + key = "remote_ip" + max_concurrency = 10 + max_queue_length = 100 + max_queue_wait = "1m" + `, + expectedCfg: PackObjectsLimiting{ + Key: PackObjectsLimitingKeyRemoteIP, + MaxConcurrency: 10, + MaxQueueLength: 100, MaxQueueWait: duration.Duration(1 * time.Minute), }, }, @@ -1645,6 +1666,7 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "project" max_concurrency = 10 + max_queue_length = 100 max_queue_wait = "1m" `, expectedErrString: `unsupported pack objects limiting key: "project"`, @@ -1667,8 +1689,122 @@ func TestPackObjectsLimiting(t *testing.T) { } } +// This test uses (*testing.T).Setenv. Thus, it should not run in parallel. +func TestPackObjectsLimiting_defaultPackObjectsLimiting(t *testing.T) { + testCases := []struct { + desc string + envs map[string]string + expectedCfg PackObjectsLimiting + }{ + { + desc: "not relative envs are set", + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "100", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 100, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is invalid", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "hello", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "100", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 100, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is invalid", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "hello", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY and GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH are bot set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "1", + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "2", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 2, + MaxQueueWait: 0, + MaxQueueLength: 1, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + for key, value := range tc.envs { + t.Setenv(key, value) + } + + cfg := defaultPackObjectsLimiting() + require.Equal(t, tc.expectedCfg, cfg) + }) + } +} + func TestPackObjectsLimiting_Validate(t *testing.T) { t.Parallel() + + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 0}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 1}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + "max_concurrency", + ), + }, + PackObjectsLimiting{MaxConcurrency: -1}.Validate(), + ) + + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + "max_queue_length", + ), + }, + PackObjectsLimiting{MaxQueueLength: -1}.Validate(), + ) + require.NoError(t, PackObjectsLimiting{MaxQueueWait: duration.Duration(1)}.Validate()) require.Equal( t, |