Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-03-29 10:38:11 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-04-11 11:06:14 +0300
commitff93231c648cd9511267220d656f27e735148d88 (patch)
treef3b4e0eff3ffacf42714ab55532bf630123e7b5d
parent13f08896b308c617c4a4ca585212069808422367 (diff)
Refator PackObjectsLimiting configs
This commit adds MaxQueueLength field to make it consistent with other limiters. As the concurrency limiter is new, we don't want to add a support for such configs to Omnibus/K8s helm chart too soon. As a result, the configs can be fed from the environment variables. Besides, this commit moves the default PackObjectsLimiting configurations to `internal/config/config.go`. That file is the centralized location to store all Gitaly configurations as well as default setting.
-rw-r--r--internal/cli/gitaly/serve.go11
-rw-r--r--internal/gitaly/config/config.go44
-rw-r--r--internal/gitaly/config/config_test.go144
3 files changed, 183 insertions, 16 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 9f68f722c..37997859d 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -271,16 +271,9 @@ func run(cfg config.Cfg) error {
string(cfg.PackObjectsLimiting.Key),
cfg.Prometheus.GRPCLatencyBuckets,
)
- packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency
- if packObjectsConcurrencyLimit == 0 {
- // TODO: remove this default setting when we remove the feature
- // flags PackObjectsLimitingRepo and PackObjectsLimitingUser
- // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413
- packObjectsConcurrencyLimit = 200
- }
packObjectsLimiter := limithandler.NewConcurrencyLimiter(
- int(packObjectsConcurrencyLimit),
- 0,
+ cfg.PackObjectsLimiting.MaxConcurrency,
+ cfg.PackObjectsLimiting.MaxQueueLength,
func() helper.Ticker {
return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration())
},
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 8d8f5a33e..c920edf81 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -26,6 +26,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/duration"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
)
@@ -407,6 +408,9 @@ const (
// PackObjectsLimitingKeyRepository will limit the pack objects concurrency
// by repository
PackObjectsLimitingKeyRepository = PackObjectsLimitingKey("repository")
+ // PackObjectsLimitingKeyRemoteIP will limit the pack objects concurrency
+ // by RemoteIP
+ PackObjectsLimitingKeyRemoteIP = PackObjectsLimitingKey("remote_ip")
)
// ParsePackObjectsLimitingKey checks if the key is a valid
@@ -417,6 +421,8 @@ func ParsePackObjectsLimitingKey(k string) (PackObjectsLimitingKey, error) {
return PackObjectsLimitingKeyUser, nil
case PackObjectsLimitingKeyRepository:
return PackObjectsLimitingKeyRepository, nil
+ case PackObjectsLimitingKeyRemoteIP:
+ return PackObjectsLimitingKeyRemoteIP, nil
default:
return "", fmt.Errorf("unsupported pack objects limiting key: %q", k)
}
@@ -441,15 +447,19 @@ type PackObjectsLimiting struct {
Key PackObjectsLimitingKey `toml:"key,omitempty" json:"key,omitempty"`
// MaxConcurrency is the maximum number of concurrent pack objects processes
// for a given key.
- MaxConcurrency uint `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
+ MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
// MaxQueueWait is the maximum time a request can remain in the concurrency queue
// waiting to be picked up by Gitaly.
MaxQueueWait duration.Duration `toml:"max_queue_wait,omitempty" json:"max_queue_wait,omitempty"`
+ // MaxQueueLength is the maximum length of the request queue
+ MaxQueueLength int `toml:"max_queue_length,omitempty" json:"max_queue_length,omitempty"`
}
// Validate runs validation on all fields and compose all found errors.
func (pol PackObjectsLimiting) Validate() error {
return cfgerror.New().
+ Append(cfgerror.IsPositive(pol.MaxConcurrency), "max_concurrency").
+ Append(cfgerror.IsPositive(pol.MaxQueueLength), "max_queue_length").
Append(cfgerror.IsPositive(pol.MaxQueueWait.Duration()), "max_queue_wait").
AsError()
}
@@ -486,12 +496,40 @@ func defaultPackObjectsCacheConfig() StreamCacheConfig {
}
}
+func defaultPackObjectsLimiting() PackObjectsLimiting {
+ var maxConcurrency, maxQueueLength int
+
+ // TODO: Injecting environment variables here boosts the time to try out new concurrency
+ // limiting. When this new limiter is in-place, they should go through official
+ // configure methods (Omnibus, K8s helm chart, etc.) instead.
+ if maxConcurrencyFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY", 200); err == nil {
+ maxConcurrency = maxConcurrencyFromEnv
+ }
+ if maxConcurrency == 0 {
+ // TODO: remove this default setting when we remove the feature
+ // flags PackObjectsLimitingRepo, PackObjectsLimitingUser, and PackObjectsLimitingRemoteIP
+ // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413
+ maxConcurrency = 200
+ }
+ if maxQueueLengthFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH", 0); err == nil {
+ maxQueueLength = maxQueueLengthFromEnv
+ }
+
+ return PackObjectsLimiting{
+ Key: "",
+ MaxConcurrency: maxConcurrency,
+ MaxQueueLength: maxQueueLength,
+ MaxQueueWait: 0, // Do not enforce queue wait at this point
+ }
+}
+
// Load initializes the Config variable from file and the environment.
// Environment variables take precedence over the file.
func Load(file io.Reader) (Cfg, error) {
cfg := Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
if err := toml.NewDecoder(file).Decode(&cfg); err != nil {
diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go
index 9baa2c35a..f130c0d00 100644
--- a/internal/gitaly/config/config_test.go
+++ b/internal/gitaly/config/config_test.go
@@ -46,8 +46,9 @@ func TestLoadEmptyConfig(t *testing.T) {
require.NoError(t, err)
expectedCfg := Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
require.NoError(t, expectedCfg.setDefaults())
@@ -186,8 +187,9 @@ func TestLoadConfigCommand(t *testing.T) {
modifyDefaultConfig := func(modify func(cfg *Cfg)) Cfg {
cfg := &Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
require.NoError(t, cfg.setDefaults())
modify(cfg)
@@ -1619,11 +1621,13 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "repository"
max_concurrency = 20
+ max_queue_length = 100
max_queue_wait = "10s"
`,
expectedCfg: PackObjectsLimiting{
Key: PackObjectsLimitingKeyRepository,
MaxConcurrency: 20,
+ MaxQueueLength: 100,
MaxQueueWait: duration.Duration(10 * time.Second),
},
},
@@ -1632,11 +1636,28 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "user"
max_concurrency = 10
+ max_queue_length = 100
max_queue_wait = "1m"
`,
expectedCfg: PackObjectsLimiting{
Key: PackObjectsLimitingKeyUser,
MaxConcurrency: 10,
+ MaxQueueLength: 100,
+ MaxQueueWait: duration.Duration(1 * time.Minute),
+ },
+ },
+ {
+ desc: "using remote_ip as key",
+ rawCfg: `[pack_objects_limiting]
+ key = "remote_ip"
+ max_concurrency = 10
+ max_queue_length = 100
+ max_queue_wait = "1m"
+ `,
+ expectedCfg: PackObjectsLimiting{
+ Key: PackObjectsLimitingKeyRemoteIP,
+ MaxConcurrency: 10,
+ MaxQueueLength: 100,
MaxQueueWait: duration.Duration(1 * time.Minute),
},
},
@@ -1645,6 +1666,7 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "project"
max_concurrency = 10
+ max_queue_length = 100
max_queue_wait = "1m"
`,
expectedErrString: `unsupported pack objects limiting key: "project"`,
@@ -1667,8 +1689,122 @@ func TestPackObjectsLimiting(t *testing.T) {
}
}
+// This test uses (*testing.T).Setenv. Thus, it should not run in parallel.
+func TestPackObjectsLimiting_defaultPackObjectsLimiting(t *testing.T) {
+ testCases := []struct {
+ desc string
+ envs map[string]string
+ expectedCfg PackObjectsLimiting
+ }{
+ {
+ desc: "not relative envs are set",
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "100",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 100,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is invalid",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "hello",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "100",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 100,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is invalid",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "hello",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY and GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH are bot set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "1",
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "2",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 2,
+ MaxQueueWait: 0,
+ MaxQueueLength: 1,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ for key, value := range tc.envs {
+ t.Setenv(key, value)
+ }
+
+ cfg := defaultPackObjectsLimiting()
+ require.Equal(t, tc.expectedCfg, cfg)
+ })
+ }
+}
+
func TestPackObjectsLimiting_Validate(t *testing.T) {
t.Parallel()
+
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 0}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 1}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1", cfgerror.ErrIsNegative),
+ "max_concurrency",
+ ),
+ },
+ PackObjectsLimiting{MaxConcurrency: -1}.Validate(),
+ )
+
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1", cfgerror.ErrIsNegative),
+ "max_queue_length",
+ ),
+ },
+ PackObjectsLimiting{MaxQueueLength: -1}.Validate(),
+ )
+
require.NoError(t, PackObjectsLimiting{MaxQueueWait: duration.Duration(1)}.Validate())
require.Equal(
t,