diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-04-12 05:28:34 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-04-12 05:28:34 +0300 |
commit | b8190668d147784e8be4a379b33f691363e08a0f (patch) | |
tree | cc7a6a6fb814b2af5c304f995f7156fb134a4aa2 | |
parent | a6614c36a3db5e7dded00a912ce71a8c6c07e5a2 (diff) | |
parent | c4397165c73e9ad4a4640cbf771ba2155c6e7ab1 (diff) |
Merge branch 'qmnguyen0711/concurrency-limit-by-remote-ip' into 'master'
Add pack-objects concurrency limiter based on RemoteIP
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5583
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: John Cai <jcai@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/cli/gitaly/serve.go | 11 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 44 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 144 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 22 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 327 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go | 10 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_limiting_repo.go | 2 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_limiting_user.go | 2 |
8 files changed, 430 insertions, 132 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 9f68f722c..37997859d 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -271,16 +271,9 @@ func run(cfg config.Cfg) error { string(cfg.PackObjectsLimiting.Key), cfg.Prometheus.GRPCLatencyBuckets, ) - packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency - if packObjectsConcurrencyLimit == 0 { - // TODO: remove this default setting when we remove the feature - // flags PackObjectsLimitingRepo and PackObjectsLimitingUser - // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413 - packObjectsConcurrencyLimit = 200 - } packObjectsLimiter := limithandler.NewConcurrencyLimiter( - int(packObjectsConcurrencyLimit), - 0, + cfg.PackObjectsLimiting.MaxConcurrency, + cfg.PackObjectsLimiting.MaxQueueLength, func() helper.Ticker { return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration()) }, diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 8d8f5a33e..c920edf81 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -26,6 +26,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/duration" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" ) @@ -407,6 +408,9 @@ const ( // PackObjectsLimitingKeyRepository will limit the pack objects concurrency // by repository PackObjectsLimitingKeyRepository = PackObjectsLimitingKey("repository") + // PackObjectsLimitingKeyRemoteIP will limit the pack objects concurrency + // by RemoteIP + PackObjectsLimitingKeyRemoteIP = PackObjectsLimitingKey("remote_ip") ) // ParsePackObjectsLimitingKey checks if the key is a valid @@ -417,6 +421,8 @@ func ParsePackObjectsLimitingKey(k string) (PackObjectsLimitingKey, error) { return PackObjectsLimitingKeyUser, nil case PackObjectsLimitingKeyRepository: return PackObjectsLimitingKeyRepository, nil + case PackObjectsLimitingKeyRemoteIP: + return PackObjectsLimitingKeyRemoteIP, nil default: return "", fmt.Errorf("unsupported pack objects limiting key: %q", k) } @@ -441,15 +447,19 @@ type PackObjectsLimiting struct { Key PackObjectsLimitingKey `toml:"key,omitempty" json:"key,omitempty"` // MaxConcurrency is the maximum number of concurrent pack objects processes // for a given key. - MaxConcurrency uint `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` + MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` // MaxQueueWait is the maximum time a request can remain in the concurrency queue // waiting to be picked up by Gitaly. MaxQueueWait duration.Duration `toml:"max_queue_wait,omitempty" json:"max_queue_wait,omitempty"` + // MaxQueueLength is the maximum length of the request queue + MaxQueueLength int `toml:"max_queue_length,omitempty" json:"max_queue_length,omitempty"` } // Validate runs validation on all fields and compose all found errors. func (pol PackObjectsLimiting) Validate() error { return cfgerror.New(). + Append(cfgerror.IsPositive(pol.MaxConcurrency), "max_concurrency"). + Append(cfgerror.IsPositive(pol.MaxQueueLength), "max_queue_length"). Append(cfgerror.IsPositive(pol.MaxQueueWait.Duration()), "max_queue_wait"). AsError() } @@ -486,12 +496,40 @@ func defaultPackObjectsCacheConfig() StreamCacheConfig { } } +func defaultPackObjectsLimiting() PackObjectsLimiting { + var maxConcurrency, maxQueueLength int + + // TODO: Injecting environment variables here boosts the time to try out new concurrency + // limiting. When this new limiter is in-place, they should go through official + // configure methods (Omnibus, K8s helm chart, etc.) instead. + if maxConcurrencyFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY", 200); err == nil { + maxConcurrency = maxConcurrencyFromEnv + } + if maxConcurrency == 0 { + // TODO: remove this default setting when we remove the feature + // flags PackObjectsLimitingRepo, PackObjectsLimitingUser, and PackObjectsLimitingRemoteIP + // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413 + maxConcurrency = 200 + } + if maxQueueLengthFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH", 0); err == nil { + maxQueueLength = maxQueueLengthFromEnv + } + + return PackObjectsLimiting{ + Key: "", + MaxConcurrency: maxConcurrency, + MaxQueueLength: maxQueueLength, + MaxQueueWait: 0, // Do not enforce queue wait at this point + } +} + // Load initializes the Config variable from file and the environment. // Environment variables take precedence over the file. func Load(file io.Reader) (Cfg, error) { cfg := Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } if err := toml.NewDecoder(file).Decode(&cfg); err != nil { diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index 9baa2c35a..f130c0d00 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -46,8 +46,9 @@ func TestLoadEmptyConfig(t *testing.T) { require.NoError(t, err) expectedCfg := Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } require.NoError(t, expectedCfg.setDefaults()) @@ -186,8 +187,9 @@ func TestLoadConfigCommand(t *testing.T) { modifyDefaultConfig := func(modify func(cfg *Cfg)) Cfg { cfg := &Cfg{ - Prometheus: prometheus.DefaultConfig(), - PackObjectsCache: defaultPackObjectsCacheConfig(), + Prometheus: prometheus.DefaultConfig(), + PackObjectsCache: defaultPackObjectsCacheConfig(), + PackObjectsLimiting: defaultPackObjectsLimiting(), } require.NoError(t, cfg.setDefaults()) modify(cfg) @@ -1619,11 +1621,13 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "repository" max_concurrency = 20 + max_queue_length = 100 max_queue_wait = "10s" `, expectedCfg: PackObjectsLimiting{ Key: PackObjectsLimitingKeyRepository, MaxConcurrency: 20, + MaxQueueLength: 100, MaxQueueWait: duration.Duration(10 * time.Second), }, }, @@ -1632,11 +1636,28 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "user" max_concurrency = 10 + max_queue_length = 100 max_queue_wait = "1m" `, expectedCfg: PackObjectsLimiting{ Key: PackObjectsLimitingKeyUser, MaxConcurrency: 10, + MaxQueueLength: 100, + MaxQueueWait: duration.Duration(1 * time.Minute), + }, + }, + { + desc: "using remote_ip as key", + rawCfg: `[pack_objects_limiting] + key = "remote_ip" + max_concurrency = 10 + max_queue_length = 100 + max_queue_wait = "1m" + `, + expectedCfg: PackObjectsLimiting{ + Key: PackObjectsLimitingKeyRemoteIP, + MaxConcurrency: 10, + MaxQueueLength: 100, MaxQueueWait: duration.Duration(1 * time.Minute), }, }, @@ -1645,6 +1666,7 @@ func TestPackObjectsLimiting(t *testing.T) { rawCfg: `[pack_objects_limiting] key = "project" max_concurrency = 10 + max_queue_length = 100 max_queue_wait = "1m" `, expectedErrString: `unsupported pack objects limiting key: "project"`, @@ -1667,8 +1689,122 @@ func TestPackObjectsLimiting(t *testing.T) { } } +// This test uses (*testing.T).Setenv. Thus, it should not run in parallel. +func TestPackObjectsLimiting_defaultPackObjectsLimiting(t *testing.T) { + testCases := []struct { + desc string + envs map[string]string + expectedCfg PackObjectsLimiting + }{ + { + desc: "not relative envs are set", + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "100", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 100, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is invalid", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "hello", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "100", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 100, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is invalid", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "hello", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 200, + MaxQueueWait: 0, + MaxQueueLength: 0, + }, + }, + { + desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY and GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH are bot set", + envs: map[string]string{ + "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "1", + "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "2", + }, + expectedCfg: PackObjectsLimiting{ + MaxConcurrency: 2, + MaxQueueWait: 0, + MaxQueueLength: 1, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + for key, value := range tc.envs { + t.Setenv(key, value) + } + + cfg := defaultPackObjectsLimiting() + require.Equal(t, tc.expectedCfg, cfg) + }) + } +} + func TestPackObjectsLimiting_Validate(t *testing.T) { t.Parallel() + + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 0}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 1}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxConcurrency: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + "max_concurrency", + ), + }, + PackObjectsLimiting{MaxConcurrency: -1}.Validate(), + ) + + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate()) + require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate()) + require.Equal( + t, + cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + "max_queue_length", + ), + }, + PackObjectsLimiting{MaxQueueLength: -1}.Validate(), + ) + require.NoError(t, PackObjectsLimiting{MaxQueueWait: duration.Duration(1)}.Validate()) require.Equal( t, diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 5e37da542..27bfc39b8 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -10,6 +10,7 @@ import ( "fmt" "hash" "io" + "net" "os" "strings" "syscall" @@ -76,6 +77,11 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH key := hex.EncodeToString(h.Sum(nil)) servedBytes, created, err := s.packObjectsCache.Fetch(ctx, key, output, func(w io.Writer) error { + // TODO: We now have three different concurrency strategies for pack-objects. All of + // them are guarded behind feature flags. This situation is because we wanted to verify + // and pick the most effective strategy. `PackObjectsLimiting.Key` config is ignored + // at this point. We will either checking it properly, or remove it when there is + // a conclusion on production. if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) { return s.runPackObjectsLimited( ctx, @@ -100,6 +106,22 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH ) } + if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) && req.GetRemoteIp() != "" { + ipAddr := net.ParseIP(req.GetRemoteIp()) + // Ignore loop-back IPs + if ipAddr != nil && !ipAddr.IsLoopback() { + return s.runPackObjectsLimited( + ctx, + w, + req.GetRemoteIp(), + req, + args, + stdin, + key, + ) + } + } + return s.runPackObjects(ctx, w, req, args, stdin, key) }) if err != nil { diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 775bd289e..d05de7458 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -91,6 +91,7 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) { testhelper.NewFeatureSets( featureflag.PackObjectsLimitingUser, featureflag.PackObjectsLimitingRepo, + featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookSeparateContextWithRuntimeDir, @@ -223,6 +224,7 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) { testhelper.NewFeatureSets( featureflag.PackObjectsLimitingUser, featureflag.PackObjectsLimitingRepo, + featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookUsesCache, @@ -311,6 +313,7 @@ func TestServer_PackObjectsHookWithSidechannel(t *testing.T) { testhelper.NewFeatureSets( featureflag.PackObjectsLimitingUser, featureflag.PackObjectsLimitingRepo, + featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookWithSidechannelWithRuntimeDir, @@ -583,6 +586,7 @@ func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) { testhelper.NewFeatureSets( featureflag.PackObjectsLimitingUser, featureflag.PackObjectsLimitingRepo, + featureflag.PackObjectsLimitingRemoteIP, ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir, @@ -662,6 +666,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { testhelper.NewFeatureSets( featureflag.PackObjectsLimitingUser, featureflag.PackObjectsLimitingRepo, + featureflag.PackObjectsLimitingRemoteIP, ).Run(t, testPackObjectsConcurrency) } @@ -676,124 +681,216 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { keyType = "repo" } else if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) { keyType = "user" + } else if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) { + keyType = "remote_ip" } - ticker := helper.NewManualTicker() - monitor := limithandler.NewPackObjectsConcurrencyMonitor( - keyType, - cfg.Prometheus.GRPCLatencyBuckets, - ) - limiter := limithandler.NewConcurrencyLimiter( - 1, - 0, - func() helper.Ticker { return ticker }, - monitor, - ) - - registry := prometheus.NewRegistry() - registry.MustRegister(monitor) - - receivedCh, blockCh := make(chan struct{}), make(chan struct{}) - cfg.SocketPath = runHooksServer(t, cfg, []serverOption{ - withRunPackObjectsFn(func( - context.Context, - git.CommandFactory, - io.Writer, - *gitalypb.PackObjectsHookWithSidechannelRequest, - *packObjectsArgs, - io.Reader, - string, - *hookPkg.ConcurrencyTracker, - ) error { - receivedCh <- struct{}{} - <-blockCh - return nil - }), - }, - testserver.WithPackObjectsLimiter(limiter), - ) + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} - ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c") - require.NoError(t, err) + for _, tc := range []struct { + desc string + requests [2]*gitalypb.PackObjectsHookWithSidechannelRequest + shouldLimit bool + }{ + { + desc: "never reach limit", + requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{ + { + GlId: "user-123", + RemoteIp: "1.2.3.4", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + { + GlId: "user-456", + RemoteIp: "1.2.3.5", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "d/e/f", + }, + Args: args, + }, + }, + shouldLimit: false, + }, + { + desc: "normal IP address", + requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{ + { + GlId: "user-123", + RemoteIp: "1.2.3.4", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + { + GlId: "user-123", + RemoteIp: "1.2.3.4", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + }, + shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || + featureflag.PackObjectsLimitingUser.IsEnabled(ctx) || + featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx), + }, + { + desc: "IPv4 loopback addresses", + requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{ + { + GlId: "user-123", + RemoteIp: "127.0.0.1", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + { + GlId: "user-123", + RemoteIp: "127.0.0.1", + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + }, + shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || + featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + }, + { + desc: "IPv6 loopback addresses", + requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{ + { + GlId: "user-123", + RemoteIp: net.IPv6loopback.String(), + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + { + GlId: "user-123", + RemoteIp: net.IPv6loopback.String(), + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: args, + }, + }, + shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || + featureflag.PackObjectsLimitingUser.IsEnabled(ctx), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ticker := helper.NewManualTicker() + monitor := limithandler.NewPackObjectsConcurrencyMonitor( + keyType, + cfg.Prometheus.GRPCLatencyBuckets, + ) + limiter := limithandler.NewConcurrencyLimiter( + 1, + 0, + func() helper.Ticker { return ticker }, + monitor, + ) - ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503") - require.NoError(t, err) + registry := prometheus.NewRegistry() + registry.MustRegister(monitor) + + receivedCh, blockCh := make(chan struct{}), make(chan struct{}) + cfg.SocketPath = runHooksServer(t, cfg, []serverOption{ + withRunPackObjectsFn(func( + context.Context, + git.CommandFactory, + io.Writer, + *gitalypb.PackObjectsHookWithSidechannelRequest, + *packObjectsArgs, + io.Reader, + string, + *hookPkg.ConcurrencyTracker, + ) error { + receivedCh <- struct{}{} + <-blockCh + return nil + }), + }, + testserver.WithPackObjectsLimiter(limiter), + ) - userID := "user-123" - req1 := &gitalypb.PackObjectsHookWithSidechannelRequest{ - GlId: userID, - Repository: &gitalypb.Repository{ - StorageName: "storage-0", - RelativePath: "a/b/c", - }, - Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, - } + ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c") + require.NoError(t, err) - req2 := &gitalypb.PackObjectsHookWithSidechannelRequest{ - GlId: userID, - Repository: &gitalypb.Repository{ - StorageName: "storage-0", - RelativePath: "a/b/c", - }, - Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, - } + ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503") + require.NoError(t, err) - client, conn := newHooksClient(t, cfg.SocketPath) - defer testhelper.MustClose(t, conn) + client, conn := newHooksClient(t, cfg.SocketPath) + defer testhelper.MustClose(t, conn) - var wg sync.WaitGroup - wg.Add(2) + var wg sync.WaitGroup + wg.Add(2) - errChan := make(chan error) + errChan := make(chan error) - // We fire off two requests. Since the concurrency limit is set at 1, - // the first request will make it through the concurrency limiter and - // get blocked in the function provide to withRunPackObjectsFn() above. - // The second request will get concurrency limited and will be waiting - // in the queue. - // When we call Tick() on the max queue wait ticker, the second request - // will return with an error. + // We fire off two requests. Since the concurrency limit is set at 1, + // the first request will make it through the concurrency limiter and + // get blocked in the function provide to withRunPackObjectsFn() above. + // The second request will get concurrency limited and will be waiting + // in the queue. + // When we call Tick() on the max queue wait ticker, the second request + // will return with an error. - type call struct { - ctx context.Context - req *gitalypb.PackObjectsHookWithSidechannelRequest - } + type call struct { + ctx context.Context + req *gitalypb.PackObjectsHookWithSidechannelRequest + } - for _, c := range []call{ - {ctx: ctx1, req: req1}, - {ctx: ctx2, req: req2}, - } { - go func(c call) { - defer wg.Done() - _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req) - if err != nil { - errChan <- err + for _, c := range []call{ + {ctx: ctx1, req: tc.requests[0]}, + {ctx: ctx2, req: tc.requests[1]}, + } { + go func(c call) { + defer wg.Done() + _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req) + if err != nil { + errChan <- err + } + }(c) } - }(c) - } - if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || featureflag.PackObjectsLimitingUser.IsEnabled(ctx) { - <-receivedCh + if tc.shouldLimit { + <-receivedCh - require.NoError(t, - testutil.GatherAndCompare(registry, - bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls + require.NoError(t, + testutil.GatherAndCompare(registry, + bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls # TYPE gitaly_pack_objects_in_progress gauge gitaly_pack_objects_in_progress{type=%q} 1 `, keyType)), "gitaly_pack_objects_in_progress")) - ticker.Tick() + ticker.Tick() - err := <-errChan - testhelper.RequireGrpcCode( - t, - err, - codes.ResourceExhausted, - ) + err := <-errChan + testhelper.RequireGrpcCode( + t, + err, + codes.ResourceExhausted, + ) - close(blockCh) + close(blockCh) - expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue + expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue # TYPE gitaly_pack_objects_dropped_total counter gitaly_pack_objects_dropped_total{reason="max_time", type=%q} 1 # HELP gitaly_pack_objects_queued Gauge of number of queued calls @@ -801,24 +898,26 @@ gitaly_pack_objects_dropped_total{reason="max_time", type=%q} 1 gitaly_pack_objects_queued{type=%q} 0 `, keyType, keyType)) - require.NoError(t, - testutil.GatherAndCompare(registry, expectedMetrics, - "gitaly_pack_objects_dropped_total", - "gitaly_pack_objects_queued", - )) - - acquiringSecondsCount, err := testutil.GatherAndCount(registry, - "gitaly_pack_objects_acquiring_seconds") - require.NoError(t, err) + require.NoError(t, + testutil.GatherAndCompare(registry, expectedMetrics, + "gitaly_pack_objects_dropped_total", + "gitaly_pack_objects_queued", + )) + + acquiringSecondsCount, err := testutil.GatherAndCount(registry, + "gitaly_pack_objects_acquiring_seconds") + require.NoError(t, err) + + require.Equal(t, 1, acquiringSecondsCount) + } else { + close(blockCh) + <-receivedCh + <-receivedCh + } - require.Equal(t, 1, acquiringSecondsCount) - } else { - close(blockCh) - <-receivedCh - <-receivedCh + wg.Wait() + require.NoError(t, wt1.Wait()) + require.NoError(t, wt2.Wait()) + }) } - - wg.Wait() - require.NoError(t, wt1.Wait()) - require.NoError(t, wt2.Wait()) } diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go b/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go new file mode 100644 index 000000000..e4d9a5c1e --- /dev/null +++ b/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go @@ -0,0 +1,10 @@ +package featureflag + +// PackObjectsLimitingRemoteIP will enable a concurrency limiter for pack objects +// based off of the remote IP +var PackObjectsLimitingRemoteIP = NewFeatureFlag( + "pack_objects_limiting_remote_ip", + "v15.11.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", + false, +) diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go index 559c184e4..a07874c4c 100644 --- a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go +++ b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go @@ -4,7 +4,7 @@ package featureflag // based off of the repository. var PackObjectsLimitingRepo = NewFeatureFlag( "pack_objects_limiting_repo", - "v15.6.0", + "v15.11.0", "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", false, ) diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go index cc7535d5f..1e8319ad1 100644 --- a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go +++ b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go @@ -4,7 +4,7 @@ package featureflag // based off of the user var PackObjectsLimitingUser = NewFeatureFlag( "pack_objects_limiting_user", - "v15.6.0", + "v15.11.0", "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", false, ) |