diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2023-05-10 20:05:26 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2023-05-10 20:05:26 +0300 |
commit | 54e5afc843d6232c0097f20695de9e0029e1caa0 (patch) | |
tree | 40be67b434b963852cd27d819207e8a006f33c05 | |
parent | 63a97f734b8dddf651926cc5ce731fd07c4b0c65 (diff) | |
parent | e776aab22dec3a58aad5f267558ed2a9eac8432e (diff) |
Automatic merge of gitlab-org/gitaly master
-rw-r--r-- | internal/cli/gitaly/serve.go | 1 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 44 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 33 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 24 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 59 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_limiting_repo.go | 10 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_limiting_user.go | 10 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor.go | 24 | ||||
-rw-r--r-- | internal/middleware/limithandler/monitor_test.go | 33 |
9 files changed, 42 insertions, 196 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index bdccf031c..120f3fdb0 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -269,7 +269,6 @@ func run(cfg config.Cfg) error { ) packObjectsMonitor := limithandler.NewPackObjectsConcurrencyMonitor( - string(cfg.PackObjectsLimiting.Key), cfg.Prometheus.GRPCLatencyBuckets, ) newTickerFunc := func() helper.Ticker { diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 3cc905b84..c28654221 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -397,53 +397,10 @@ type RateLimiting struct { Burst int `toml:"burst" json:"burst"` } -// PackObjectsLimitingKey is the key for limiting pack objects concurrency -type PackObjectsLimitingKey string - -const ( - // PackObjectsLimitingKeyUser will limit the pack objects concurrency - // by user - PackObjectsLimitingKeyUser = PackObjectsLimitingKey("user") - // PackObjectsLimitingKeyRepository will limit the pack objects concurrency - // by repository - PackObjectsLimitingKeyRepository = PackObjectsLimitingKey("repository") - // PackObjectsLimitingKeyRemoteIP will limit the pack objects concurrency - // by RemoteIP - PackObjectsLimitingKeyRemoteIP = PackObjectsLimitingKey("remote_ip") -) - -// ParsePackObjectsLimitingKey checks if the key is a valid -// PackObjectsLimitingKey -func ParsePackObjectsLimitingKey(k string) (PackObjectsLimitingKey, error) { - switch PackObjectsLimitingKey(k) { - case PackObjectsLimitingKeyUser: - return PackObjectsLimitingKeyUser, nil - case PackObjectsLimitingKeyRepository: - return PackObjectsLimitingKeyRepository, nil - case PackObjectsLimitingKeyRemoteIP: - return PackObjectsLimitingKeyRemoteIP, nil - default: - return "", fmt.Errorf("unsupported pack objects limiting key: %q", k) - } -} - -// UnmarshalText unmarshals a key into a ParsePackObjectsLimitingKey -func (p *PackObjectsLimitingKey) UnmarshalText(text []byte) error { - v, err := ParsePackObjectsLimitingKey(string(text)) - if err != nil { - return err - } - *p = v - return nil -} - // PackObjectsLimiting allows the concurrency of pack objects processes to be limited // Requests that come in after the maximum number of concurrent pack objects // processes have been reached will wait. type PackObjectsLimiting struct { - // Key is the key by which concurrency will be limited. Supported keys - // are: user_id, repository. - Key PackObjectsLimitingKey `toml:"key,omitempty" json:"key,omitempty"` // MaxConcurrency is the maximum number of concurrent pack objects processes // for a given key. MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` @@ -515,7 +472,6 @@ func defaultPackObjectsLimiting() PackObjectsLimiting { } return PackObjectsLimiting{ - Key: "", MaxConcurrency: maxConcurrency, MaxQueueLength: maxQueueLength, MaxQueueWait: 0, // Do not enforce queue wait at this point diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index 727922b04..e2039297c 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1617,60 +1617,31 @@ func TestPackObjectsLimiting(t *testing.T) { expectedCfg PackObjectsLimiting }{ { - desc: "using repo as key", + desc: "max_queue_wait is set to 10s", rawCfg: `[pack_objects_limiting] - key = "repository" max_concurrency = 20 max_queue_length = 100 max_queue_wait = "10s" `, expectedCfg: PackObjectsLimiting{ - Key: PackObjectsLimitingKeyRepository, MaxConcurrency: 20, MaxQueueLength: 100, MaxQueueWait: duration.Duration(10 * time.Second), }, }, { - desc: "using user as key", + desc: "max_queue_wait is set to 1m", rawCfg: `[pack_objects_limiting] - key = "user" max_concurrency = 10 max_queue_length = 100 max_queue_wait = "1m" `, expectedCfg: PackObjectsLimiting{ - Key: PackObjectsLimitingKeyUser, MaxConcurrency: 10, MaxQueueLength: 100, MaxQueueWait: duration.Duration(1 * time.Minute), }, }, - { - desc: "using remote_ip as key", - rawCfg: `[pack_objects_limiting] - key = "remote_ip" - max_concurrency = 10 - max_queue_length = 100 - max_queue_wait = "1m" - `, - expectedCfg: PackObjectsLimiting{ - Key: PackObjectsLimitingKeyRemoteIP, - MaxConcurrency: 10, - MaxQueueLength: 100, - MaxQueueWait: duration.Duration(1 * time.Minute), - }, - }, - { - desc: "invalid key", - rawCfg: `[pack_objects_limiting] - key = "project" - max_concurrency = 10 - max_queue_length = 100 - max_queue_wait = "1m" - `, - expectedErrString: `unsupported pack objects limiting key: "project"`, - }, } for _, tc := range testCases { diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 8e7e6f448..c5a904bd7 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -69,30 +69,6 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH // and pick the most effective strategy. `PackObjectsLimiting.Key` config is ignored // at this point. We will either checking it properly, or remove it when there is // a conclusion on production. - if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) { - return s.runPackObjectsLimited( - ctx, - w, - req.GetRepository().GetStorageName()+":"+req.GetRepository().GetRelativePath(), - req, - args, - stdin, - cacheKey, - ) - } - - if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) && req.GetGlId() != "" { - return s.runPackObjectsLimited( - ctx, - w, - req.GetGlId(), - req, - args, - stdin, - cacheKey, - ) - } - if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) && req.GetRemoteIp() != "" { ipAddr := net.ParseIP(req.GetRemoteIp()) if ipAddr == nil { diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 92e4e0164..4c2de55ab 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -89,8 +89,6 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( - featureflag.PackObjectsLimitingUser, - featureflag.PackObjectsLimitingRepo, featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, @@ -222,8 +220,6 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( - featureflag.PackObjectsLimitingUser, - featureflag.PackObjectsLimitingRepo, featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, @@ -444,8 +440,6 @@ func TestServer_PackObjectsHookWithSidechannel(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( - featureflag.PackObjectsLimitingUser, - featureflag.PackObjectsLimitingRepo, featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, @@ -701,8 +695,6 @@ func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( - featureflag.PackObjectsLimitingUser, - featureflag.PackObjectsLimitingRepo, featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, @@ -780,11 +772,7 @@ func setupSidechannel(t *testing.T, ctx context.Context, oid string) (context.Co func TestPackObjects_concurrencyLimit(t *testing.T) { t.Parallel() - testhelper.NewFeatureSets( - featureflag.PackObjectsLimitingUser, - featureflag.PackObjectsLimitingRepo, - featureflag.PackObjectsLimitingRemoteIP, - ).Run(t, testPackObjectsConcurrency) + testhelper.NewFeatureSets(featureflag.PackObjectsLimitingRemoteIP).Run(t, testPackObjectsConcurrency) } func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { @@ -792,16 +780,6 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { cfg := cfgWithCache(t, 0) - var keyType string - - if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) { - keyType = "repo" - } else if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) { - keyType = "user" - } else if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) { - keyType = "remote_ip" - } - args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} for _, tc := range []struct { @@ -855,9 +833,7 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx) || - featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx), + shouldLimit: featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx), }, { desc: "IP addresses including source port", @@ -881,9 +857,7 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx) || - featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx), + shouldLimit: featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx), }, { desc: "IPv4 loopback addresses", @@ -907,8 +881,7 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + shouldLimit: false, }, { desc: "IPv6 loopback addresses", @@ -932,8 +905,7 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + shouldLimit: false, }, { desc: "invalid IP addresses", @@ -957,8 +929,7 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + shouldLimit: false, }, { desc: "empty IP addresses", @@ -982,14 +953,12 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { Args: args, }, }, - shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || - featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + shouldLimit: false, }, } { t.Run(tc.desc, func(t *testing.T) { ticker := helper.NewManualTicker() monitor := limithandler.NewPackObjectsConcurrencyMonitor( - keyType, cfg.Prometheus.GRPCLatencyBuckets, ) limiter := limithandler.NewConcurrencyLimiter( @@ -1067,10 +1036,10 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { require.NoError(t, testutil.GatherAndCompare(registry, - bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls + bytes.NewBufferString(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls # TYPE gitaly_pack_objects_in_progress gauge -gitaly_pack_objects_in_progress{type=%q} 1 -`, keyType)), "gitaly_pack_objects_in_progress")) +gitaly_pack_objects_in_progress 1 +`), "gitaly_pack_objects_in_progress")) ticker.Tick() @@ -1083,13 +1052,13 @@ gitaly_pack_objects_in_progress{type=%q} 1 close(blockCh) - expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue + expectedMetrics := bytes.NewBufferString(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue # TYPE gitaly_pack_objects_dropped_total counter -gitaly_pack_objects_dropped_total{reason="max_time", type=%q} 1 +gitaly_pack_objects_dropped_total{reason="max_time"} 1 # HELP gitaly_pack_objects_queued Gauge of number of queued calls # TYPE gitaly_pack_objects_queued gauge -gitaly_pack_objects_queued{type=%q} 0 -`, keyType, keyType)) +gitaly_pack_objects_queued 0 +`) require.NoError(t, testutil.GatherAndCompare(registry, expectedMetrics, diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go deleted file mode 100644 index a07874c4c..000000000 --- a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go +++ /dev/null @@ -1,10 +0,0 @@ -package featureflag - -// PackObjectsLimitingRepo will enable a concurrency limiter for pack objects -// based off of the repository. -var PackObjectsLimitingRepo = NewFeatureFlag( - "pack_objects_limiting_repo", - "v15.11.0", - "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", - false, -) diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go deleted file mode 100644 index 1e8319ad1..000000000 --- a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go +++ /dev/null @@ -1,10 +0,0 @@ -package featureflag - -// PackObjectsLimitingUser will enable a concurrency limiter for pack objects -// based off of the user -var PackObjectsLimitingUser = NewFeatureFlag( - "pack_objects_limiting_user", - "v15.11.0", - "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", - false, -) diff --git a/internal/middleware/limithandler/monitor.go b/internal/middleware/limithandler/monitor.go index aa44d3e20..da295c563 100644 --- a/internal/middleware/limithandler/monitor.go +++ b/internal/middleware/limithandler/monitor.go @@ -110,16 +110,15 @@ func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, reaso func newPromMonitor( limitingType string, - keyType string, - queuedVec, inProgressVec *prometheus.GaugeVec, + queuedVec, inProgressVec prometheus.Gauge, acquiringSecondsVec *prometheus.HistogramVec, requestsDroppedVec *prometheus.CounterVec, ) *PromMonitor { return &PromMonitor{ limitingType: limitingType, - queuedMetric: queuedVec.WithLabelValues(keyType), - inProgressMetric: inProgressVec.WithLabelValues(keyType), - acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(keyType), + queuedMetric: queuedVec, + inProgressMetric: inProgressVec, + acquiringSecondsMetric: acquiringSecondsVec.WithLabelValues(), requestsDroppedMetric: requestsDroppedVec, acquiringSecondsHistogramVec: acquiringSecondsVec, } @@ -149,30 +148,28 @@ func splitMethodName(fullMethodName string) (string, string) { // NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use // with limiting pack objects processes. -func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor { +func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor { acquiringSecondsVec := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "gitaly_pack_objects_acquiring_seconds", Help: "Histogram of time calls are rate limited (in seconds)", Buckets: latencyBuckets, }, - []string{"type"}, + nil, ) - inProgressVec := prometheus.NewGaugeVec( + inProgressVec := prometheus.NewGauge( prometheus.GaugeOpts{ Name: "gitaly_pack_objects_in_progress", Help: "Gauge of number of concurrent in-progress calls", }, - []string{"type"}, ) - queuedVec := prometheus.NewGaugeVec( + queuedVec := prometheus.NewGauge( prometheus.GaugeOpts{ Name: "gitaly_pack_objects_queued", Help: "Gauge of number of queued calls", }, - []string{"type"}, ) requestsDroppedVec := prometheus.NewCounterVec( @@ -180,12 +177,11 @@ func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) Name: "gitaly_pack_objects_dropped_total", Help: "Number of requests dropped from the queue", }, - []string{"type", "reason"}, - ).MustCurryWith(prometheus.Labels{"type": keyType}) + []string{"reason"}, + ) return newPromMonitor( TypePackObjects, - keyType, queuedVec, inProgressVec, acquiringSecondsVec, diff --git a/internal/middleware/limithandler/monitor_test.go b/internal/middleware/limithandler/monitor_test.go index 4571515a7..1c5eb9dbf 100644 --- a/internal/middleware/limithandler/monitor_test.go +++ b/internal/middleware/limithandler/monitor_test.go @@ -117,7 +117,6 @@ func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { ctx := InitLimitStats(testhelper.Context(t)) m := NewPackObjectsConcurrencyMonitor( - "user", promconfig.DefaultConfig().GRPCLatencyBuckets, ) @@ -127,26 +126,26 @@ func TestNewPackObjectsConcurrencyMonitor(t *testing.T) { expectedMetrics := `# HELP gitaly_pack_objects_acquiring_seconds Histogram of time calls are rate limited (in seconds) # TYPE gitaly_pack_objects_acquiring_seconds histogram -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="0.001"} 0 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="0.005"} 0 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="0.025"} 0 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="0.1"} 0 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="0.5"} 0 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="1"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="10"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="30"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="60"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="300"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="1500"} 1 -gitaly_pack_objects_acquiring_seconds_bucket{type="user",le="+Inf"} 1 -gitaly_pack_objects_acquiring_seconds_sum{type="user"} 1 -gitaly_pack_objects_acquiring_seconds_count{type="user"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.001"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.005"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.025"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.1"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="0.5"} 0 +gitaly_pack_objects_acquiring_seconds_bucket{le="1"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="10"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="30"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="60"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="300"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="1500"} 1 +gitaly_pack_objects_acquiring_seconds_bucket{le="+Inf"} 1 +gitaly_pack_objects_acquiring_seconds_sum 1 +gitaly_pack_objects_acquiring_seconds_count 1 # HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue # TYPE gitaly_pack_objects_dropped_total counter -gitaly_pack_objects_dropped_total{reason="load",type="user"} 1 +gitaly_pack_objects_dropped_total{reason="load"} 1 # HELP gitaly_pack_objects_queued Gauge of number of queued calls # TYPE gitaly_pack_objects_queued gauge -gitaly_pack_objects_queued{type="user"} 1 +gitaly_pack_objects_queued 1 ` require.NoError(t, testutil.CollectAndCompare( |