Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-08-08 22:41:44 +0300
committerJohn Cai <jcai@gitlab.com>2022-10-12 21:53:11 +0300
commite993794650cbbef756df1e3a721bb7a34f7e63fb (patch)
tree5181df36869677a0de8e9cc76c40072862c08ee4
parentde9194831c315fc89cc5f505b133c574cbee3070 (diff)
Add pack objects limiter to hook serverjc-pack-objects-limit
Add a limiter to the hook server, and also add it to the dependencies everywhere, including test helpers.
-rw-r--r--cmd/gitaly-hooks/hooks_test.go2
-rw-r--r--cmd/gitaly/main.go23
-rw-r--r--internal/git/localrepo/remote_extra_test.go2
-rw-r--r--internal/git/updateref/update_with_hooks_test.go2
-rw-r--r--internal/gitaly/service/cleanup/testhelper_test.go2
-rw-r--r--internal/gitaly/service/conflicts/testhelper_test.go2
-rw-r--r--internal/gitaly/service/dependencies.go6
-rw-r--r--internal/gitaly/service/hook/pack_objects.go64
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go237
-rw-r--r--internal/gitaly/service/hook/server.go4
-rw-r--r--internal/gitaly/service/hook/testhelper_test.go1
-rw-r--r--internal/gitaly/service/objectpool/testhelper_test.go2
-rw-r--r--internal/gitaly/service/operations/branches_test.go2
-rw-r--r--internal/gitaly/service/operations/testhelper_test.go2
-rw-r--r--internal/gitaly/service/ref/delete_refs_test.go2
-rw-r--r--internal/gitaly/service/ref/testhelper_test.go2
-rw-r--r--internal/gitaly/service/repository/testhelper_test.go2
-rw-r--r--internal/gitaly/service/setup/register.go1
-rw-r--r--internal/gitaly/service/smarthttp/testhelper_test.go2
-rw-r--r--internal/gitaly/service/ssh/testhelper_test.go4
-rw-r--r--internal/gitaly/service/wiki/testhelper_test.go2
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_limiting_repo.go10
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_limiting_user.go10
-rw-r--r--internal/testhelper/testserver/gitaly.go20
24 files changed, 383 insertions, 23 deletions
diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go
index b5479123c..49a99d5c3 100644
--- a/cmd/gitaly-hooks/hooks_test.go
+++ b/cmd/gitaly-hooks/hooks_test.go
@@ -534,7 +534,7 @@ func (svc featureFlagAsserter) PackObjectsHookWithSidechannel(ctx context.Contex
func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client, serverOpts ...testserver.GitalyServerOpt) {
testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{
- t: t, wrapped: hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()),
+ t: t, wrapped: hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()),
})
}, append(serverOpts, testserver.WithGitLabClient(gitlabClient))...)
}
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go
index 805ac336a..9aa2289ac 100644
--- a/cmd/gitaly/main.go
+++ b/cmd/gitaly/main.go
@@ -275,7 +275,29 @@ func run(cfg config.Cfg) error {
limithandler.LimitConcurrencyByRepo,
limithandler.WithRateLimiters(ctx),
)
+
+ packObjectsMonitor := limithandler.NewPackObjectsConcurrencyMonitor(
+ string(cfg.PackObjectsLimiting.Key),
+ cfg.Prometheus.GRPCLatencyBuckets,
+ )
+ packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency
+ if packObjectsConcurrencyLimit == 0 {
+ // TODO: remove this default setting when we remove the feature
+ // flags PackObjectsLimitingRepo and PackObjectsLimitingUser
+ // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413
+ packObjectsConcurrencyLimit = 200
+ }
+ packObjectsLimiter := limithandler.NewConcurrencyLimiter(
+ packObjectsConcurrencyLimit,
+ 0,
+ func() helper.Ticker {
+ return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration())
+ },
+ packObjectsMonitor,
+ )
+
prometheus.MustRegister(concurrencyLimitHandler, rateLimitHandler)
+ prometheus.MustRegister(packObjectsMonitor)
gitalyServerFactory := server.NewGitalyServerFactory(
cfg,
@@ -335,6 +357,7 @@ func run(cfg config.Cfg) error {
DiskCache: diskCache,
PackObjectsCache: streamCache,
PackObjectsConcurrencyTracker: concurrencyTracker,
+ PackObjectsLimiter: packObjectsLimiter,
Git2goExecutor: git2goExecutor,
UpdaterWithHooks: updaterWithHooks,
HousekeepingManager: housekeepingManager,
diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go
index dafd7e8ae..08b2b2a43 100644
--- a/internal/git/localrepo/remote_extra_test.go
+++ b/internal/git/localrepo/remote_extra_test.go
@@ -38,7 +38,7 @@ func TestRepo_FetchInternal(t *testing.T) {
gitalypb.RegisterHookServiceServer(srv, hook.NewServer(
deps.GetHookManager(),
deps.GetGitCmdFactory(),
- deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
deps.GetCfg(),
deps.GetRubyServer(),
diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/git/updateref/update_with_hooks_test.go
index b47db59fe..3766e2ce1 100644
--- a/internal/git/updateref/update_with_hooks_test.go
+++ b/internal/git/updateref/update_with_hooks_test.go
@@ -98,7 +98,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) {
// We need to set up a separate "real" hook service here, as it will be used in
// git-update-ref(1) spawned by `updateRefWithHooks()`
testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
})
repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{
diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go
index 6909d67da..31c380452 100644
--- a/internal/gitaly/service/cleanup/testhelper_test.go
+++ b/internal/gitaly/service/cleanup/testhelper_test.go
@@ -45,7 +45,7 @@ func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string {
deps.GetGitCmdFactory(),
deps.GetCatfileCache(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
deps.GetCfg(),
deps.GetRubyServer(),
diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go
index d7add6e0b..3240263a4 100644
--- a/internal/gitaly/service/conflicts/testhelper_test.go
+++ b/internal/gitaly/service/conflicts/testhelper_test.go
@@ -71,7 +71,7 @@ func runConflictsServer(tb testing.TB, cfg config.Cfg, hookManager hook.Manager)
deps.GetGitCmdFactory(),
deps.GetTxManager(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(
deps.GetCfg(),
deps.GetLocator(),
diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go
index 15448ac2b..ef52af062 100644
--- a/internal/gitaly/service/dependencies.go
+++ b/internal/gitaly/service/dependencies.go
@@ -34,6 +34,7 @@ type Dependencies struct {
DiskCache cache.Cache
PackObjectsCache streamcache.Cache
PackObjectsConcurrencyTracker *gitalyhook.ConcurrencyTracker
+ PackObjectsLimiter limithandler.Limiter
LimitHandler *limithandler.LimiterMiddleware
Git2goExecutor *git2go.Executor
UpdaterWithHooks *updateref.UpdaterWithHooks
@@ -124,3 +125,8 @@ func (dc *Dependencies) GetUpdaterWithHooks() *updateref.UpdaterWithHooks {
func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager {
return dc.HousekeepingManager
}
+
+// GetPackObjectsLimiter returns the pack-objects limiter.
+func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter {
+ return dc.PackObjectsLimiter
+}
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 758f8f36c..80a3f1ef2 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -22,6 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/stream"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/protobuf/encoding/protojson"
@@ -73,6 +74,30 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
key := hex.EncodeToString(h.Sum(nil))
r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error {
+ if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) {
+ return s.runPackObjectsLimited(
+ ctx,
+ w,
+ req.GetRepository().GetStorageName()+":"+req.GetRepository().GetRelativePath(),
+ req,
+ args,
+ stdin,
+ key,
+ )
+ }
+
+ if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) && req.GetGlId() != "" {
+ return s.runPackObjectsLimited(
+ ctx,
+ w,
+ req.GetGlId(),
+ req,
+ args,
+ stdin,
+ key,
+ )
+ }
+
return s.runPackObjects(ctx, w, req, args, stdin, key)
})
if err != nil {
@@ -133,6 +158,45 @@ func (s *server) runPackObjects(
return s.runPackObjectsFn(ctx, s.gitCmdFactory, w, req, args, stdin, key, s.concurrencyTracker)
}
+func (s *server) runPackObjectsLimited(
+ ctx context.Context,
+ w io.Writer,
+ limitkey string,
+ req *gitalypb.PackObjectsHookWithSidechannelRequest,
+ args *packObjectsArgs,
+ stdin io.ReadCloser,
+ key string,
+) error {
+ ctx = helper.SuppressCancellation(ctx)
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ defer stdin.Close()
+
+ if _, err := s.packObjectsLimiter.Limit(
+ ctx,
+ limitkey,
+ func() (interface{}, error) {
+ return nil,
+ s.runPackObjectsFn(
+ ctx,
+ s.gitCmdFactory,
+ w,
+ req,
+ args,
+ stdin,
+ key,
+ s.concurrencyTracker,
+ )
+ },
+ ); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func runPackObjects(
ctx context.Context,
gitCmdFactory git.CommandFactory,
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 403f3fe77..b300849ce 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -5,12 +5,14 @@ package hook
import (
"bytes"
"context"
+ "fmt"
"io"
"net"
"strings"
"sync"
"testing"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
@@ -20,6 +22,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
hookPkg "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v15/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
@@ -79,10 +84,14 @@ func TestParsePackObjectsArgs(t *testing.T) {
func TestServer_PackObjectsHook_separateContext(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(
+
+ testhelper.NewFeatureSets(
+ featureflag.PackObjectsLimitingUser,
+ featureflag.PackObjectsLimitingRepo,
+ ).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookSeparateContextWithRuntimeDir,
- )(t, testhelper.Context(t))
+ ))
}
func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
@@ -197,10 +206,14 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx co
func TestServer_PackObjectsHook_usesCache(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(
+
+ testhelper.NewFeatureSets(
+ featureflag.PackObjectsLimitingUser,
+ featureflag.PackObjectsLimitingRepo,
+ ).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookUsesCache,
- )(t, testhelper.Context(t))
+ ))
}
func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) {
@@ -282,10 +295,13 @@ func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runti
func TestServer_PackObjectsHookWithSidechannel(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(
+ testhelper.NewFeatureSets(
+ featureflag.PackObjectsLimitingUser,
+ featureflag.PackObjectsLimitingRepo,
+ ).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookWithSidechannelWithRuntimeDir,
- )(t, testhelper.Context(t))
+ ))
}
func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
@@ -517,10 +533,13 @@ func TestServer_PackObjectsHookWithSidechannel_invalidArgument(t *testing.T) {
func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(
+ testhelper.NewFeatureSets(
+ featureflag.PackObjectsLimitingUser,
+ featureflag.PackObjectsLimitingRepo,
+ ).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir,
- )(t, testhelper.Context(t))
+ ))
}
func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
@@ -557,3 +576,205 @@ func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T
require.NoError(t, wt.Wait())
}
+
+func withRunPackObjectsFn(
+ f func(
+ context.Context,
+ git.CommandFactory,
+ io.Writer,
+ *gitalypb.PackObjectsHookWithSidechannelRequest,
+ *packObjectsArgs,
+ io.Reader,
+ string,
+ *hookPkg.ConcurrencyTracker,
+ ) error,
+) serverOption {
+ return func(s *server) {
+ s.runPackObjectsFn = f
+ }
+}
+
+func setupSidechannel(t *testing.T, ctx context.Context, oid string) (context.Context, *hookPkg.SidechannelWaiter, error) {
+ return hookPkg.SetupSidechannel(
+ ctx,
+ git.HooksPayload{
+ RuntimeDir: testhelper.TempDir(t),
+ },
+ func(c *net.UnixConn) error {
+ // Simulate a client that successfully initiates a request, but hangs up
+ // before fully consuming the response.
+ _, err := io.WriteString(c, fmt.Sprintf("%s\n--not\n\n", oid))
+ return err
+ },
+ )
+}
+
+func TestPackObjects_concurrencyLimit(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(
+ featureflag.PackObjectsLimitingUser,
+ featureflag.PackObjectsLimitingRepo,
+ ).Run(t, testPackObjectsConcurrency)
+}
+
+func testPackObjectsConcurrency(t *testing.T, ctx context.Context) {
+ t.Parallel()
+
+ cfg := cfgWithCache(t)
+
+ var keyType string
+
+ if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) {
+ keyType = "repo"
+ } else if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) {
+ keyType = "user"
+ }
+
+ ticker := helper.NewManualTicker()
+ monitor := limithandler.NewPackObjectsConcurrencyMonitor(
+ keyType,
+ cfg.Prometheus.GRPCLatencyBuckets,
+ )
+ limiter := limithandler.NewConcurrencyLimiter(
+ 1,
+ 0,
+ func() helper.Ticker { return ticker },
+ monitor,
+ )
+
+ registry := prometheus.NewRegistry()
+ registry.MustRegister(monitor)
+
+ receivedCh, blockCh := make(chan struct{}), make(chan struct{})
+ cfg.SocketPath = runHooksServer(t, cfg, []serverOption{
+ withRunPackObjectsFn(func(
+ context.Context,
+ git.CommandFactory,
+ io.Writer,
+ *gitalypb.PackObjectsHookWithSidechannelRequest,
+ *packObjectsArgs,
+ io.Reader,
+ string,
+ *hookPkg.ConcurrencyTracker,
+ ) error {
+ receivedCh <- struct{}{}
+ <-blockCh
+ return nil
+ }),
+ },
+ testserver.WithPackObjectsLimiter(limiter),
+ )
+
+ ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c")
+ require.NoError(t, err)
+
+ ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503")
+ require.NoError(t, err)
+
+ userID := "user-123"
+ req1 := &gitalypb.PackObjectsHookWithSidechannelRequest{
+ GlId: userID,
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"},
+ }
+
+ req2 := &gitalypb.PackObjectsHookWithSidechannelRequest{
+ GlId: userID,
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"},
+ }
+
+ client, conn := newHooksClient(t, cfg.SocketPath)
+ defer testhelper.MustClose(t, conn)
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ errChan := make(chan error)
+
+ // We fire off two requests. Since the concurrency limit is set at 1,
+ // the first request will make it through the concurrency limiter and
+ // get blocked in the function provide to withRunPackObjectsFn() above.
+ // The second request will get concurrency limited and will be waiting
+ // in the queue.
+ // When we call Tick() on the max queue wait ticker, the second request
+ // will return with an error.
+
+ type call struct {
+ ctx context.Context
+ req *gitalypb.PackObjectsHookWithSidechannelRequest
+ }
+
+ for _, c := range []call{
+ {ctx: ctx1, req: req1},
+ {ctx: ctx2, req: req2},
+ } {
+ go func(c call) {
+ defer wg.Done()
+ _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req)
+ if err != nil {
+ errChan <- err
+ }
+ }(c)
+ }
+
+ if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || featureflag.PackObjectsLimitingUser.IsEnabled(ctx) {
+ <-receivedCh
+
+ require.NoError(t,
+ testutil.GatherAndCompare(registry,
+ bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+# TYPE gitaly_pack_objects_in_progress gauge
+gitaly_pack_objects_in_progress{type="%s"} 1
+`, keyType)), "gitaly_pack_objects_in_progress"))
+
+ ticker.Tick()
+
+ err := <-errChan
+ testhelper.RequireGrpcCode(
+ t,
+ err,
+ codes.Unavailable,
+ )
+
+ close(blockCh)
+
+ expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue
+# TYPE gitaly_pack_objects_dropped_total counter
+gitaly_pack_objects_dropped_total{reason="max_time", type="%s"} 1
+# HELP gitaly_pack_objects_queued Gauge of number of queued calls
+# TYPE gitaly_pack_objects_queued gauge
+gitaly_pack_objects_queued{type="%s"} 0
+`, keyType, keyType))
+
+ require.NoError(t,
+ testutil.GatherAndCompare(registry, expectedMetrics,
+ "gitaly_pack_objects_dropped_total",
+ "gitaly_pack_objects_queued",
+ ))
+
+ acquiringSecondsCount, err := testutil.GatherAndCount(registry,
+ "gitaly_pack_objects_acquiring_seconds")
+ require.NoError(t, err)
+
+ require.Equal(t,
+ 1,
+ acquiringSecondsCount,
+ )
+ } else {
+ close(blockCh)
+ <-receivedCh
+ <-receivedCh
+ }
+
+ wg.Wait()
+ require.NoError(t, wt1.Wait())
+ require.NoError(t, wt2.Wait())
+}
diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go
index 5922f0b41..d25279721 100644
--- a/internal/gitaly/service/hook/server.go
+++ b/internal/gitaly/service/hook/server.go
@@ -6,6 +6,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
gitalyhook "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v15/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
)
@@ -16,6 +17,7 @@ type server struct {
gitCmdFactory git.CommandFactory
packObjectsCache streamcache.Cache
concurrencyTracker *gitalyhook.ConcurrencyTracker
+ packObjectsLimiter limithandler.Limiter
runPackObjectsFn func(
context.Context,
git.CommandFactory,
@@ -34,11 +36,13 @@ func NewServer(
gitCmdFactory git.CommandFactory,
packObjectsCache streamcache.Cache,
concurrencyTracker *gitalyhook.ConcurrencyTracker,
+ packObjectsLimiter limithandler.Limiter,
) gitalypb.HookServiceServer {
srv := &server{
manager: manager,
gitCmdFactory: gitCmdFactory,
packObjectsCache: packObjectsCache,
+ packObjectsLimiter: packObjectsLimiter,
concurrencyTracker: concurrencyTracker,
runPackObjectsFn: runPackObjects,
}
diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go
index 926d16ca4..91e207176 100644
--- a/internal/gitaly/service/hook/testhelper_test.go
+++ b/internal/gitaly/service/hook/testhelper_test.go
@@ -65,6 +65,7 @@ func runHooksServer(tb testing.TB, cfg config.Cfg, opts []serverOption, serverOp
deps.GetGitCmdFactory(),
deps.GetPackObjectsCache(),
deps.GetPackObjectsConcurrencyTracker(),
+ deps.GetPackObjectsLimiter(),
)
for _, opt := range opts {
opt(hookServer.(*server))
diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go
index 0d37c3f23..f9667aedd 100644
--- a/internal/gitaly/service/objectpool/testhelper_test.go
+++ b/internal/gitaly/service/objectpool/testhelper_test.go
@@ -86,7 +86,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator,
gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(
deps.GetHookManager(),
deps.GetGitCmdFactory(),
- deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
cfg,
deps.GetRubyServer(),
diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go
index 04a91d486..bc6f126b2 100644
--- a/internal/gitaly/service/operations/branches_test.go
+++ b/internal/gitaly/service/operations/branches_test.go
@@ -138,7 +138,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) {
deps.GetCatfileCache(),
deps.GetUpdaterWithHooks(),
))
- gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
// Praefect proxy execution disabled as praefect runs only on the UNIX socket, but
// the test requires a TCP listening address.
}, testserver.WithDisablePraefect())
diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go
index 13639d586..84563d30a 100644
--- a/internal/gitaly/service/operations/testhelper_test.go
+++ b/internal/gitaly/service/operations/testhelper_test.go
@@ -104,7 +104,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser
)
gitalypb.RegisterOperationServiceServer(srv, operationServer)
- gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
deps.GetCfg(),
nil,
diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go
index a821999f7..7757e6c79 100644
--- a/internal/gitaly/service/ref/delete_refs_test.go
+++ b/internal/gitaly/service/ref/delete_refs_test.go
@@ -125,7 +125,7 @@ func testDeleteRefsTransaction(t *testing.T, ctx context.Context) {
deps.GetGit2goExecutor(),
deps.GetHousekeepingManager(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
}, testserver.WithTransactionManager(txManager))
cfg.SocketPath = addr
diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go
index cd5e4ee1c..628b4fe57 100644
--- a/internal/gitaly/service/ref/testhelper_test.go
+++ b/internal/gitaly/service/ref/testhelper_test.go
@@ -67,7 +67,7 @@ func runRefServiceServer(tb testing.TB, cfg config.Cfg) string {
deps.GetTxManager(),
deps.GetCatfileCache(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
deps.GetCfg(),
deps.GetRubyServer(),
diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go
index 566ee73a7..5c12f8269 100644
--- a/internal/gitaly/service/repository/testhelper_test.go
+++ b/internal/gitaly/service/repository/testhelper_test.go
@@ -126,7 +126,7 @@ func runRepositoryService(tb testing.TB, cfg config.Cfg, rubySrv *rubyserver.Ser
deps.GetGit2goExecutor(),
deps.GetHousekeepingManager(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterRemoteServiceServer(srv, remote.NewServer(
deps.GetLocator(),
deps.GetGitCmdFactory(),
diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go
index 69dc03c8c..8e203665d 100644
--- a/internal/gitaly/service/setup/register.go
+++ b/internal/gitaly/service/setup/register.go
@@ -145,6 +145,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) {
deps.GetGitCmdFactory(),
deps.GetPackObjectsCache(),
deps.GetPackObjectsConcurrencyTracker(),
+ deps.GetPackObjectsLimiter(),
))
gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages))
diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go
index db0d9a1b3..c841d6883 100644
--- a/internal/gitaly/service/smarthttp/testhelper_test.go
+++ b/internal/gitaly/service/smarthttp/testhelper_test.go
@@ -49,7 +49,7 @@ func startSmartHTTPServerWithOptions(t *testing.T, cfg config.Cfg, opts []Server
deps.GetGit2goExecutor(),
deps.GetHousekeepingManager(),
))
- gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
}, serverOpts...)
}
diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go
index 194fddeb8..f3fe393e1 100644
--- a/internal/gitaly/service/ssh/testhelper_test.go
+++ b/internal/gitaly/service/ssh/testhelper_test.go
@@ -41,8 +41,8 @@ func startSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, s
deps.GetHookManager(),
deps.GetGitCmdFactory(),
deps.GetPackObjectsCache(),
- deps.GetPackObjectsConcurrencyTracker(),
- ))
+ deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()),
+ )
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
cfg,
deps.GetRubyServer(),
diff --git a/internal/gitaly/service/wiki/testhelper_test.go b/internal/gitaly/service/wiki/testhelper_test.go
index 0dda852a7..bff5a5478 100644
--- a/internal/gitaly/service/wiki/testhelper_test.go
+++ b/internal/gitaly/service/wiki/testhelper_test.go
@@ -78,7 +78,7 @@ func setupWikiService(tb testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server)
gitalypb.RegisterHookServiceServer(srv, hook.NewServer(
deps.GetHookManager(),
deps.GetGitCmdFactory(),
- deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker()))
+ deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter()))
gitalypb.RegisterWikiServiceServer(srv, NewServer(deps.GetRubyServer(), deps.GetLocator()))
gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(
cfg,
diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go
new file mode 100644
index 000000000..c7945828a
--- /dev/null
+++ b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go
@@ -0,0 +1,10 @@
+package featureflag
+
+// PackObjectsLimitingRepo will enable a concurrency limiter for pack objects
+// based off of the repository.
+var PackObjectsLimitingRepo = NewFeatureFlag(
+ "pack_objects_limiting_repo",
+ "v15.5.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4413",
+ false,
+)
diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go
new file mode 100644
index 000000000..ed0040334
--- /dev/null
+++ b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go
@@ -0,0 +1,10 @@
+package featureflag
+
+// PackObjectsLimitingUser will enable a concurrency limiter for pack objects
+// based off of the user
+var PackObjectsLimitingUser = NewFeatureFlag(
+ "pack_objects_limiting_user",
+ "v15.5.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4413",
+ false,
+)
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 39fa17097..7fb745b42 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -244,6 +244,7 @@ type gitalyServerDeps struct {
diskCache cache.Cache
packObjectsCache streamcache.Cache
packObjectsConcurrencyTracker *hook.ConcurrencyTracker
+ packObjectsLimiter limithandler.Limiter
limitHandler *limithandler.LimiterMiddleware
git2goExecutor *git2go.Executor
updaterWithHooks *updateref.UpdaterWithHooks
@@ -304,6 +305,15 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg, r
gsd.packObjectsConcurrencyTracker = hook.NewConcurrencyTracker()
}
+ if gsd.packObjectsLimiter == nil {
+ gsd.packObjectsLimiter = limithandler.NewConcurrencyLimiter(
+ 0,
+ 0,
+ nil,
+ limithandler.NewNoopConcurrencyMonitor(),
+ )
+ }
+
if gsd.limitHandler == nil {
gsd.limitHandler = limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, limithandler.WithConcurrencyLimiters)
}
@@ -333,6 +343,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg, r
CatfileCache: gsd.catfileCache,
DiskCache: gsd.diskCache,
PackObjectsCache: gsd.packObjectsCache,
+ PackObjectsLimiter: gsd.packObjectsLimiter,
PackObjectsConcurrencyTracker: gsd.packObjectsConcurrencyTracker,
LimitHandler: gsd.limitHandler,
Git2goExecutor: gsd.git2goExecutor,
@@ -425,3 +436,12 @@ func WithConcurrencyTracker(tracker *hook.ConcurrencyTracker) GitalyServerOpt {
return deps
}
}
+
+// WithPackObjectsLimiter sets the PackObjectsLimiter that will be
+// used for gitaly services initialization.
+func WithPackObjectsLimiter(limiter *limithandler.ConcurrencyLimiter) GitalyServerOpt {
+ return func(deps gitalyServerDeps) gitalyServerDeps {
+ deps.packObjectsLimiter = limiter
+ return deps
+ }
+}