From e993794650cbbef756df1e3a721bb7a34f7e63fb Mon Sep 17 00:00:00 2001 From: John Cai Date: Mon, 8 Aug 2022 15:41:44 -0400 Subject: Add pack objects limiter to hook server Add a limiter to the hook server, and also add it to the dependencies everywhere, including test helpers. --- cmd/gitaly-hooks/hooks_test.go | 2 +- cmd/gitaly/main.go | 23 ++ internal/git/localrepo/remote_extra_test.go | 2 +- internal/git/updateref/update_with_hooks_test.go | 2 +- internal/gitaly/service/cleanup/testhelper_test.go | 2 +- .../gitaly/service/conflicts/testhelper_test.go | 2 +- internal/gitaly/service/dependencies.go | 6 + internal/gitaly/service/hook/pack_objects.go | 64 ++++++ internal/gitaly/service/hook/pack_objects_test.go | 237 ++++++++++++++++++++- internal/gitaly/service/hook/server.go | 4 + internal/gitaly/service/hook/testhelper_test.go | 1 + .../gitaly/service/objectpool/testhelper_test.go | 2 +- .../gitaly/service/operations/branches_test.go | 2 +- .../gitaly/service/operations/testhelper_test.go | 2 +- internal/gitaly/service/ref/delete_refs_test.go | 2 +- internal/gitaly/service/ref/testhelper_test.go | 2 +- .../gitaly/service/repository/testhelper_test.go | 2 +- internal/gitaly/service/setup/register.go | 1 + .../gitaly/service/smarthttp/testhelper_test.go | 2 +- internal/gitaly/service/ssh/testhelper_test.go | 4 +- internal/gitaly/service/wiki/testhelper_test.go | 2 +- .../featureflag/ff_pack_objects_limiting_repo.go | 10 + .../featureflag/ff_pack_objects_limiting_user.go | 10 + internal/testhelper/testserver/gitaly.go | 20 ++ 24 files changed, 383 insertions(+), 23 deletions(-) create mode 100644 internal/metadata/featureflag/ff_pack_objects_limiting_repo.go create mode 100644 internal/metadata/featureflag/ff_pack_objects_limiting_user.go diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index b5479123c..49a99d5c3 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -534,7 +534,7 @@ func (svc featureFlagAsserter) PackObjectsHookWithSidechannel(ctx context.Contex func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client, serverOpts ...testserver.GitalyServerOpt) { testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ - t: t, wrapped: hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()), + t: t, wrapped: hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()), }) }, append(serverOpts, testserver.WithGitLabClient(gitlabClient))...) } diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 805ac336a..9aa2289ac 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -275,7 +275,29 @@ func run(cfg config.Cfg) error { limithandler.LimitConcurrencyByRepo, limithandler.WithRateLimiters(ctx), ) + + packObjectsMonitor := limithandler.NewPackObjectsConcurrencyMonitor( + string(cfg.PackObjectsLimiting.Key), + cfg.Prometheus.GRPCLatencyBuckets, + ) + packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency + if packObjectsConcurrencyLimit == 0 { + // TODO: remove this default setting when we remove the feature + // flags PackObjectsLimitingRepo and PackObjectsLimitingUser + // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413 + packObjectsConcurrencyLimit = 200 + } + packObjectsLimiter := limithandler.NewConcurrencyLimiter( + packObjectsConcurrencyLimit, + 0, + func() helper.Ticker { + return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration()) + }, + packObjectsMonitor, + ) + prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler) + prometheus.MustRegister(packObjectsMonitor) gitalyServerFactory := server.NewGitalyServerFactory( cfg, @@ -335,6 +357,7 @@ func run(cfg config.Cfg) error { DiskCache: diskCache, PackObjectsCache: streamCache, PackObjectsConcurrencyTracker: concurrencyTracker, + PackObjectsLimiter: packObjectsLimiter, Git2goExecutor: git2goExecutor, UpdaterWithHooks: updaterWithHooks, HousekeepingManager: housekeepingManager, diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go index dafd7e8ae..08b2b2a43 100644 --- a/internal/git/localrepo/remote_extra_test.go +++ b/internal/git/localrepo/remote_extra_test.go @@ -38,7 +38,7 @@ func TestRepo_FetchInternal(t *testing.T) { gitalypb.RegisterHookServiceServer(srv, hook.NewServer( deps.GetHookManager(), deps.GetGitCmdFactory(), - deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/git/updateref/update_with_hooks_test.go index b47db59fe..3766e2ce1 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/git/updateref/update_with_hooks_test.go @@ -98,7 +98,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { // We need to set up a separate "real" hook service here, as it will be used in // git-update-ref(1) spawned by `updateRefWithHooks()` testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) }) repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index 6909d67da..31c380452 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -45,7 +45,7 @@ func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { deps.GetGitCmdFactory(), deps.GetCatfileCache(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index d7add6e0b..3240263a4 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -71,7 +71,7 @@ func runConflictsServer(tb testing.TB, cfg config.Cfg, hookManager hook.Manager) deps.GetGitCmdFactory(), deps.GetTxManager(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterCommitServiceServer(srv, commit.NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 15448ac2b..ef52af062 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -34,6 +34,7 @@ type Dependencies struct { DiskCache cache.Cache PackObjectsCache streamcache.Cache PackObjectsConcurrencyTracker *gitalyhook.ConcurrencyTracker + PackObjectsLimiter limithandler.Limiter LimitHandler *limithandler.LimiterMiddleware Git2goExecutor *git2go.Executor UpdaterWithHooks *updateref.UpdaterWithHooks @@ -124,3 +125,8 @@ func (dc *Dependencies) GetUpdaterWithHooks() *updateref.UpdaterWithHooks { func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager { return dc.HousekeepingManager } + +// GetPackObjectsLimiter returns the pack-objects limiter. +func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter { + return dc.PackObjectsLimiter +} diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 758f8f36c..80a3f1ef2 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -22,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/stream" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/protobuf/encoding/protojson" @@ -73,6 +74,30 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH key := hex.EncodeToString(h.Sum(nil)) r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error { + if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) { + return s.runPackObjectsLimited( + ctx, + w, + req.GetRepository().GetStorageName()+":"+req.GetRepository().GetRelativePath(), + req, + args, + stdin, + key, + ) + } + + if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) && req.GetGlId() != "" { + return s.runPackObjectsLimited( + ctx, + w, + req.GetGlId(), + req, + args, + stdin, + key, + ) + } + return s.runPackObjects(ctx, w, req, args, stdin, key) }) if err != nil { @@ -133,6 +158,45 @@ func (s *server) runPackObjects( return s.runPackObjectsFn(ctx, s.gitCmdFactory, w, req, args, stdin, key, s.concurrencyTracker) } +func (s *server) runPackObjectsLimited( + ctx context.Context, + w io.Writer, + limitkey string, + req *gitalypb.PackObjectsHookWithSidechannelRequest, + args *packObjectsArgs, + stdin io.ReadCloser, + key string, +) error { + ctx = helper.SuppressCancellation(ctx) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + defer stdin.Close() + + if _, err := s.packObjectsLimiter.Limit( + ctx, + limitkey, + func() (interface{}, error) { + return nil, + s.runPackObjectsFn( + ctx, + s.gitCmdFactory, + w, + req, + args, + stdin, + key, + s.concurrencyTracker, + ) + }, + ); err != nil { + return err + } + + return nil +} + func runPackObjects( ctx context.Context, gitCmdFactory git.CommandFactory, diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 403f3fe77..b300849ce 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -5,12 +5,14 @@ package hook import ( "bytes" "context" + "fmt" "io" "net" "strings" "sync" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" @@ -20,6 +22,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" hookPkg "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v15/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" @@ -79,10 +84,14 @@ func TestParsePackObjectsArgs(t *testing.T) { func TestServer_PackObjectsHook_separateContext(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir( + + testhelper.NewFeatureSets( + featureflag.PackObjectsLimitingUser, + featureflag.PackObjectsLimitingRepo, + ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookSeparateContextWithRuntimeDir, - )(t, testhelper.Context(t)) + )) } func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { @@ -197,10 +206,14 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx co func TestServer_PackObjectsHook_usesCache(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir( + + testhelper.NewFeatureSets( + featureflag.PackObjectsLimitingUser, + featureflag.PackObjectsLimitingRepo, + ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookUsesCache, - )(t, testhelper.Context(t)) + )) } func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) { @@ -282,10 +295,13 @@ func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runti func TestServer_PackObjectsHookWithSidechannel(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir( + testhelper.NewFeatureSets( + featureflag.PackObjectsLimitingUser, + featureflag.PackObjectsLimitingRepo, + ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookWithSidechannelWithRuntimeDir, - )(t, testhelper.Context(t)) + )) } func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { @@ -517,10 +533,13 @@ func TestServer_PackObjectsHookWithSidechannel_invalidArgument(t *testing.T) { func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir( + testhelper.NewFeatureSets( + featureflag.PackObjectsLimitingUser, + featureflag.PackObjectsLimitingRepo, + ).Run(t, runTestsWithRuntimeDir( t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir, - )(t, testhelper.Context(t)) + )) } func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { @@ -557,3 +576,205 @@ func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T require.NoError(t, wt.Wait()) } + +func withRunPackObjectsFn( + f func( + context.Context, + git.CommandFactory, + io.Writer, + *gitalypb.PackObjectsHookWithSidechannelRequest, + *packObjectsArgs, + io.Reader, + string, + *hookPkg.ConcurrencyTracker, + ) error, +) serverOption { + return func(s *server) { + s.runPackObjectsFn = f + } +} + +func setupSidechannel(t *testing.T, ctx context.Context, oid string) (context.Context, *hookPkg.SidechannelWaiter, error) { + return hookPkg.SetupSidechannel( + ctx, + git.HooksPayload{ + RuntimeDir: testhelper.TempDir(t), + }, + func(c *net.UnixConn) error { + // Simulate a client that successfully initiates a request, but hangs up + // before fully consuming the response. + _, err := io.WriteString(c, fmt.Sprintf("%s\n--not\n\n", oid)) + return err + }, + ) +} + +func TestPackObjects_concurrencyLimit(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets( + featureflag.PackObjectsLimitingUser, + featureflag.PackObjectsLimitingRepo, + ).Run(t, testPackObjectsConcurrency) +} + +func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { + t.Parallel() + + cfg := cfgWithCache(t) + + var keyType string + + if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) { + keyType = "repo" + } else if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) { + keyType = "user" + } + + ticker := helper.NewManualTicker() + monitor := limithandler.NewPackObjectsConcurrencyMonitor( + keyType, + cfg.Prometheus.GRPCLatencyBuckets, + ) + limiter := limithandler.NewConcurrencyLimiter( + 1, + 0, + func() helper.Ticker { return ticker }, + monitor, + ) + + registry := prometheus.NewRegistry() + registry.MustRegister(monitor) + + receivedCh, blockCh := make(chan struct{}), make(chan struct{}) + cfg.SocketPath = runHooksServer(t, cfg, []serverOption{ + withRunPackObjectsFn(func( + context.Context, + git.CommandFactory, + io.Writer, + *gitalypb.PackObjectsHookWithSidechannelRequest, + *packObjectsArgs, + io.Reader, + string, + *hookPkg.ConcurrencyTracker, + ) error { + receivedCh <- struct{}{} + <-blockCh + return nil + }), + }, + testserver.WithPackObjectsLimiter(limiter), + ) + + ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c") + require.NoError(t, err) + + ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503") + require.NoError(t, err) + + userID := "user-123" + req1 := &gitalypb.PackObjectsHookWithSidechannelRequest{ + GlId: userID, + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + } + + req2 := &gitalypb.PackObjectsHookWithSidechannelRequest{ + GlId: userID, + Repository: &gitalypb.Repository{ + StorageName: "storage-0", + RelativePath: "a/b/c", + }, + Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + } + + client, conn := newHooksClient(t, cfg.SocketPath) + defer testhelper.MustClose(t, conn) + + var wg sync.WaitGroup + wg.Add(2) + + errChan := make(chan error) + + // We fire off two requests. Since the concurrency limit is set at 1, + // the first request will make it through the concurrency limiter and + // get blocked in the function provide to withRunPackObjectsFn() above. + // The second request will get concurrency limited and will be waiting + // in the queue. + // When we call Tick() on the max queue wait ticker, the second request + // will return with an error. + + type call struct { + ctx context.Context + req *gitalypb.PackObjectsHookWithSidechannelRequest + } + + for _, c := range []call{ + {ctx: ctx1, req: req1}, + {ctx: ctx2, req: req2}, + } { + go func(c call) { + defer wg.Done() + _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req) + if err != nil { + errChan <- err + } + }(c) + } + + if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || featureflag.PackObjectsLimitingUser.IsEnabled(ctx) { + <-receivedCh + + require.NoError(t, + testutil.GatherAndCompare(registry, + bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls +# TYPE gitaly_pack_objects_in_progress gauge +gitaly_pack_objects_in_progress{type="%s"} 1 +`, keyType)), "gitaly_pack_objects_in_progress")) + + ticker.Tick() + + err := <-errChan + testhelper.RequireGrpcCode( + t, + err, + codes.Unavailable, + ) + + close(blockCh) + + expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue +# TYPE gitaly_pack_objects_dropped_total counter +gitaly_pack_objects_dropped_total{reason="max_time", type="%s"} 1 +# HELP gitaly_pack_objects_queued Gauge of number of queued calls +# TYPE gitaly_pack_objects_queued gauge +gitaly_pack_objects_queued{type="%s"} 0 +`, keyType, keyType)) + + require.NoError(t, + testutil.GatherAndCompare(registry, expectedMetrics, + "gitaly_pack_objects_dropped_total", + "gitaly_pack_objects_queued", + )) + + acquiringSecondsCount, err := testutil.GatherAndCount(registry, + "gitaly_pack_objects_acquiring_seconds") + require.NoError(t, err) + + require.Equal(t, + 1, + acquiringSecondsCount, + ) + } else { + close(blockCh) + <-receivedCh + <-receivedCh + } + + wg.Wait() + require.NoError(t, wt1.Wait()) + require.NoError(t, wt2.Wait()) +} diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go index 5922f0b41..d25279721 100644 --- a/internal/gitaly/service/hook/server.go +++ b/internal/gitaly/service/hook/server.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git" gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v15/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" ) @@ -16,6 +17,7 @@ type server struct { gitCmdFactory git.CommandFactory packObjectsCache streamcache.Cache concurrencyTracker *gitalyhook.ConcurrencyTracker + packObjectsLimiter limithandler.Limiter runPackObjectsFn func( context.Context, git.CommandFactory, @@ -34,11 +36,13 @@ func NewServer( gitCmdFactory git.CommandFactory, packObjectsCache streamcache.Cache, concurrencyTracker *gitalyhook.ConcurrencyTracker, + packObjectsLimiter limithandler.Limiter, ) gitalypb.HookServiceServer { srv := &server{ manager: manager, gitCmdFactory: gitCmdFactory, packObjectsCache: packObjectsCache, + packObjectsLimiter: packObjectsLimiter, concurrencyTracker: concurrencyTracker, runPackObjectsFn: runPackObjects, } diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index 926d16ca4..91e207176 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -65,6 +65,7 @@ func runHooksServer(tb testing.TB, cfg config.Cfg, opts []serverOption, serverOp deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), + deps.GetPackObjectsLimiter(), ) for _, opt := range opts { opt(hookServer.(*server)) diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 0d37c3f23..f9667aedd 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -86,7 +86,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer( deps.GetHookManager(), deps.GetGitCmdFactory(), - deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( cfg, deps.GetRubyServer(), diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 04a91d486..bc6f126b2 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -138,7 +138,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) { deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), )) - gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) // Praefect proxy execution disabled as praefect runs only on the UNIX socket, but // the test requires a TCP listening address. }, testserver.WithDisablePraefect()) diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index 13639d586..84563d30a 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -104,7 +104,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser ) gitalypb.RegisterOperationServiceServer(srv, operationServer) - gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index a821999f7..7757e6c79 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -125,7 +125,7 @@ func testDeleteRefsTransaction(t *testing.T, ctx context.Context) { deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) }, testserver.WithTransactionManager(txManager)) cfg.SocketPath = addr diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index cd5e4ee1c..628b4fe57 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -67,7 +67,7 @@ func runRefServiceServer(tb testing.TB, cfg config.Cfg) string { deps.GetTxManager(), deps.GetCatfileCache(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index 566ee73a7..5c12f8269 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -126,7 +126,7 @@ func runRepositoryService(tb testing.TB, cfg config.Cfg, rubySrv *rubyserver.Ser deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRemoteServiceServer(srv, remote.NewServer( deps.GetLocator(), deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 69dc03c8c..8e203665d 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -145,6 +145,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), + deps.GetPackObjectsLimiter(), )) gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index db0d9a1b3..c841d6883 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -49,7 +49,7 @@ func startSmartHTTPServerWithOptions(t *testing.T, cfg config.Cfg, opts []Server deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), )) - gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) }, serverOpts...) } diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index 194fddeb8..f3fe393e1 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -41,8 +41,8 @@ func startSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, s deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), - deps.GetPackObjectsConcurrencyTracker(), - )) + deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()), + ) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( cfg, deps.GetRubyServer(), diff --git a/internal/gitaly/service/wiki/testhelper_test.go b/internal/gitaly/service/wiki/testhelper_test.go index 0dda852a7..bff5a5478 100644 --- a/internal/gitaly/service/wiki/testhelper_test.go +++ b/internal/gitaly/service/wiki/testhelper_test.go @@ -78,7 +78,7 @@ func setupWikiService(tb testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server) gitalypb.RegisterHookServiceServer(srv, hook.NewServer( deps.GetHookManager(), deps.GetGitCmdFactory(), - deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker())) + deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterWikiServiceServer(srv, NewServer(deps.GetRubyServer(), deps.GetLocator())) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( cfg, diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go new file mode 100644 index 000000000..c7945828a --- /dev/null +++ b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go @@ -0,0 +1,10 @@ +package featureflag + +// PackObjectsLimitingRepo will enable a concurrency limiter for pack objects +// based off of the repository. +var PackObjectsLimitingRepo = NewFeatureFlag( + "pack_objects_limiting_repo", + "v15.5.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", + false, +) diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go new file mode 100644 index 000000000..ed0040334 --- /dev/null +++ b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go @@ -0,0 +1,10 @@ +package featureflag + +// PackObjectsLimitingUser will enable a concurrency limiter for pack objects +// based off of the user +var PackObjectsLimitingUser = NewFeatureFlag( + "pack_objects_limiting_user", + "v15.5.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4413", + false, +) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 39fa17097..7fb745b42 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -244,6 +244,7 @@ type gitalyServerDeps struct { diskCache cache.Cache packObjectsCache streamcache.Cache packObjectsConcurrencyTracker *hook.ConcurrencyTracker + packObjectsLimiter limithandler.Limiter limitHandler *limithandler.LimiterMiddleware git2goExecutor *git2go.Executor updaterWithHooks *updateref.UpdaterWithHooks @@ -304,6 +305,15 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg, r gsd.packObjectsConcurrencyTracker = hook.NewConcurrencyTracker() } + if gsd.packObjectsLimiter == nil { + gsd.packObjectsLimiter = limithandler.NewConcurrencyLimiter( + 0, + 0, + nil, + limithandler.NewNoopConcurrencyMonitor(), + ) + } + if gsd.limitHandler == nil { gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters) } @@ -333,6 +343,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg, r CatfileCache: gsd.catfileCache, DiskCache: gsd.diskCache, PackObjectsCache: gsd.packObjectsCache, + PackObjectsLimiter: gsd.packObjectsLimiter, PackObjectsConcurrencyTracker: gsd.packObjectsConcurrencyTracker, LimitHandler: gsd.limitHandler, Git2goExecutor: gsd.git2goExecutor, @@ -425,3 +436,12 @@ func WithConcurrencyTracker(tracker *hook.ConcurrencyTracker) GitalyServerOpt { return deps } } + +// WithPackObjectsLimiter sets the PackObjectsLimiter that will be +// used for gitaly services initialization. +func WithPackObjectsLimiter(limiter *limithandler.ConcurrencyLimiter) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.packObjectsLimiter = limiter + return deps + } +} -- cgit v1.2.3