Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-04-12 05:28:34 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-04-12 05:28:34 +0300
commitb8190668d147784e8be4a379b33f691363e08a0f (patch)
treecc7a6a6fb814b2af5c304f995f7156fb134a4aa2
parenta6614c36a3db5e7dded00a912ce71a8c6c07e5a2 (diff)
parentc4397165c73e9ad4a4640cbf771ba2155c6e7ab1 (diff)
Merge branch 'qmnguyen0711/concurrency-limit-by-remote-ip' into 'master'
Add pack-objects concurrency limiter based on RemoteIP See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5583 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: John Cai <jcai@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r--internal/cli/gitaly/serve.go11
-rw-r--r--internal/gitaly/config/config.go44
-rw-r--r--internal/gitaly/config/config_test.go144
-rw-r--r--internal/gitaly/service/hook/pack_objects.go22
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go327
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go10
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_limiting_repo.go2
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_limiting_user.go2
8 files changed, 430 insertions, 132 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go
index 9f68f722c..37997859d 100644
--- a/internal/cli/gitaly/serve.go
+++ b/internal/cli/gitaly/serve.go
@@ -271,16 +271,9 @@ func run(cfg config.Cfg) error {
string(cfg.PackObjectsLimiting.Key),
cfg.Prometheus.GRPCLatencyBuckets,
)
- packObjectsConcurrencyLimit := cfg.PackObjectsLimiting.MaxConcurrency
- if packObjectsConcurrencyLimit == 0 {
- // TODO: remove this default setting when we remove the feature
- // flags PackObjectsLimitingRepo and PackObjectsLimitingUser
- // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413
- packObjectsConcurrencyLimit = 200
- }
packObjectsLimiter := limithandler.NewConcurrencyLimiter(
- int(packObjectsConcurrencyLimit),
- 0,
+ cfg.PackObjectsLimiting.MaxConcurrency,
+ cfg.PackObjectsLimiting.MaxQueueLength,
func() helper.Ticker {
return helper.NewTimerTicker(cfg.PackObjectsLimiting.MaxQueueWait.Duration())
},
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 8d8f5a33e..c920edf81 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -26,6 +26,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/duration"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
)
@@ -407,6 +408,9 @@ const (
// PackObjectsLimitingKeyRepository will limit the pack objects concurrency
// by repository
PackObjectsLimitingKeyRepository = PackObjectsLimitingKey("repository")
+ // PackObjectsLimitingKeyRemoteIP will limit the pack objects concurrency
+ // by RemoteIP
+ PackObjectsLimitingKeyRemoteIP = PackObjectsLimitingKey("remote_ip")
)
// ParsePackObjectsLimitingKey checks if the key is a valid
@@ -417,6 +421,8 @@ func ParsePackObjectsLimitingKey(k string) (PackObjectsLimitingKey, error) {
return PackObjectsLimitingKeyUser, nil
case PackObjectsLimitingKeyRepository:
return PackObjectsLimitingKeyRepository, nil
+ case PackObjectsLimitingKeyRemoteIP:
+ return PackObjectsLimitingKeyRemoteIP, nil
default:
return "", fmt.Errorf("unsupported pack objects limiting key: %q", k)
}
@@ -441,15 +447,19 @@ type PackObjectsLimiting struct {
Key PackObjectsLimitingKey `toml:"key,omitempty" json:"key,omitempty"`
// MaxConcurrency is the maximum number of concurrent pack objects processes
// for a given key.
- MaxConcurrency uint `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
+ MaxConcurrency int `toml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
// MaxQueueWait is the maximum time a request can remain in the concurrency queue
// waiting to be picked up by Gitaly.
MaxQueueWait duration.Duration `toml:"max_queue_wait,omitempty" json:"max_queue_wait,omitempty"`
+ // MaxQueueLength is the maximum length of the request queue
+ MaxQueueLength int `toml:"max_queue_length,omitempty" json:"max_queue_length,omitempty"`
}
// Validate runs validation on all fields and compose all found errors.
func (pol PackObjectsLimiting) Validate() error {
return cfgerror.New().
+ Append(cfgerror.IsPositive(pol.MaxConcurrency), "max_concurrency").
+ Append(cfgerror.IsPositive(pol.MaxQueueLength), "max_queue_length").
Append(cfgerror.IsPositive(pol.MaxQueueWait.Duration()), "max_queue_wait").
AsError()
}
@@ -486,12 +496,40 @@ func defaultPackObjectsCacheConfig() StreamCacheConfig {
}
}
+func defaultPackObjectsLimiting() PackObjectsLimiting {
+ var maxConcurrency, maxQueueLength int
+
+ // TODO: Injecting environment variables here boosts the time to try out new concurrency
+ // limiting. When this new limiter is in-place, they should go through official
+ // configure methods (Omnibus, K8s helm chart, etc.) instead.
+ if maxConcurrencyFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY", 200); err == nil {
+ maxConcurrency = maxConcurrencyFromEnv
+ }
+ if maxConcurrency == 0 {
+ // TODO: remove this default setting when we remove the feature
+ // flags PackObjectsLimitingRepo, PackObjectsLimitingUser, and PackObjectsLimitingRemoteIP
+ // feature flag issue: https://gitlab.com/gitlab-org/gitaly/-/issues/4413
+ maxConcurrency = 200
+ }
+ if maxQueueLengthFromEnv, err := env.GetInt("GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH", 0); err == nil {
+ maxQueueLength = maxQueueLengthFromEnv
+ }
+
+ return PackObjectsLimiting{
+ Key: "",
+ MaxConcurrency: maxConcurrency,
+ MaxQueueLength: maxQueueLength,
+ MaxQueueWait: 0, // Do not enforce queue wait at this point
+ }
+}
+
// Load initializes the Config variable from file and the environment.
// Environment variables take precedence over the file.
func Load(file io.Reader) (Cfg, error) {
cfg := Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
if err := toml.NewDecoder(file).Decode(&cfg); err != nil {
diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go
index 9baa2c35a..f130c0d00 100644
--- a/internal/gitaly/config/config_test.go
+++ b/internal/gitaly/config/config_test.go
@@ -46,8 +46,9 @@ func TestLoadEmptyConfig(t *testing.T) {
require.NoError(t, err)
expectedCfg := Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
require.NoError(t, expectedCfg.setDefaults())
@@ -186,8 +187,9 @@ func TestLoadConfigCommand(t *testing.T) {
modifyDefaultConfig := func(modify func(cfg *Cfg)) Cfg {
cfg := &Cfg{
- Prometheus: prometheus.DefaultConfig(),
- PackObjectsCache: defaultPackObjectsCacheConfig(),
+ Prometheus: prometheus.DefaultConfig(),
+ PackObjectsCache: defaultPackObjectsCacheConfig(),
+ PackObjectsLimiting: defaultPackObjectsLimiting(),
}
require.NoError(t, cfg.setDefaults())
modify(cfg)
@@ -1619,11 +1621,13 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "repository"
max_concurrency = 20
+ max_queue_length = 100
max_queue_wait = "10s"
`,
expectedCfg: PackObjectsLimiting{
Key: PackObjectsLimitingKeyRepository,
MaxConcurrency: 20,
+ MaxQueueLength: 100,
MaxQueueWait: duration.Duration(10 * time.Second),
},
},
@@ -1632,11 +1636,28 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "user"
max_concurrency = 10
+ max_queue_length = 100
max_queue_wait = "1m"
`,
expectedCfg: PackObjectsLimiting{
Key: PackObjectsLimitingKeyUser,
MaxConcurrency: 10,
+ MaxQueueLength: 100,
+ MaxQueueWait: duration.Duration(1 * time.Minute),
+ },
+ },
+ {
+ desc: "using remote_ip as key",
+ rawCfg: `[pack_objects_limiting]
+ key = "remote_ip"
+ max_concurrency = 10
+ max_queue_length = 100
+ max_queue_wait = "1m"
+ `,
+ expectedCfg: PackObjectsLimiting{
+ Key: PackObjectsLimitingKeyRemoteIP,
+ MaxConcurrency: 10,
+ MaxQueueLength: 100,
MaxQueueWait: duration.Duration(1 * time.Minute),
},
},
@@ -1645,6 +1666,7 @@ func TestPackObjectsLimiting(t *testing.T) {
rawCfg: `[pack_objects_limiting]
key = "project"
max_concurrency = 10
+ max_queue_length = 100
max_queue_wait = "1m"
`,
expectedErrString: `unsupported pack objects limiting key: "project"`,
@@ -1667,8 +1689,122 @@ func TestPackObjectsLimiting(t *testing.T) {
}
}
+// This test uses (*testing.T).Setenv. Thus, it should not run in parallel.
+func TestPackObjectsLimiting_defaultPackObjectsLimiting(t *testing.T) {
+ testCases := []struct {
+ desc string
+ envs map[string]string
+ expectedCfg PackObjectsLimiting
+ }{
+ {
+ desc: "not relative envs are set",
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "100",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 100,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY is invalid",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "hello",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "100",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 100,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH is invalid",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "hello",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 200,
+ MaxQueueWait: 0,
+ MaxQueueLength: 0,
+ },
+ },
+ {
+ desc: "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY and GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH are bot set",
+ envs: map[string]string{
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_QUEUE_LENGTH": "1",
+ "GITALY_PACK_OBJECTS_LIMIT_MAX_CONCURRENCY": "2",
+ },
+ expectedCfg: PackObjectsLimiting{
+ MaxConcurrency: 2,
+ MaxQueueWait: 0,
+ MaxQueueLength: 1,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ for key, value := range tc.envs {
+ t.Setenv(key, value)
+ }
+
+ cfg := defaultPackObjectsLimiting()
+ require.Equal(t, tc.expectedCfg, cfg)
+ })
+ }
+}
+
func TestPackObjectsLimiting_Validate(t *testing.T) {
t.Parallel()
+
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 0}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 1}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxConcurrency: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1", cfgerror.ErrIsNegative),
+ "max_concurrency",
+ ),
+ },
+ PackObjectsLimiting{MaxConcurrency: -1}.Validate(),
+ )
+
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 0}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 1}.Validate())
+ require.NoError(t, PackObjectsLimiting{MaxQueueLength: 100}.Validate())
+ require.Equal(
+ t,
+ cfgerror.ValidationErrors{
+ cfgerror.NewValidationError(
+ fmt.Errorf("%w: -1", cfgerror.ErrIsNegative),
+ "max_queue_length",
+ ),
+ },
+ PackObjectsLimiting{MaxQueueLength: -1}.Validate(),
+ )
+
require.NoError(t, PackObjectsLimiting{MaxQueueWait: duration.Duration(1)}.Validate())
require.Equal(
t,
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 5e37da542..27bfc39b8 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -10,6 +10,7 @@ import (
"fmt"
"hash"
"io"
+ "net"
"os"
"strings"
"syscall"
@@ -76,6 +77,11 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
key := hex.EncodeToString(h.Sum(nil))
servedBytes, created, err := s.packObjectsCache.Fetch(ctx, key, output, func(w io.Writer) error {
+ // TODO: We now have three different concurrency strategies for pack-objects. All of
+ // them are guarded behind feature flags. This situation is because we wanted to verify
+ // and pick the most effective strategy. `PackObjectsLimiting.Key` config is ignored
+ // at this point. We will either checking it properly, or remove it when there is
+ // a conclusion on production.
if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) {
return s.runPackObjectsLimited(
ctx,
@@ -100,6 +106,22 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
)
}
+ if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) && req.GetRemoteIp() != "" {
+ ipAddr := net.ParseIP(req.GetRemoteIp())
+ // Ignore loop-back IPs
+ if ipAddr != nil && !ipAddr.IsLoopback() {
+ return s.runPackObjectsLimited(
+ ctx,
+ w,
+ req.GetRemoteIp(),
+ req,
+ args,
+ stdin,
+ key,
+ )
+ }
+ }
+
return s.runPackObjects(ctx, w, req, args, stdin, key)
})
if err != nil {
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 775bd289e..d05de7458 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -91,6 +91,7 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) {
testhelper.NewFeatureSets(
featureflag.PackObjectsLimitingUser,
featureflag.PackObjectsLimitingRepo,
+ featureflag.PackObjectsLimitingRemoteIP,
).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookSeparateContextWithRuntimeDir,
@@ -223,6 +224,7 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) {
testhelper.NewFeatureSets(
featureflag.PackObjectsLimitingUser,
featureflag.PackObjectsLimitingRepo,
+ featureflag.PackObjectsLimitingRemoteIP,
).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookUsesCache,
@@ -311,6 +313,7 @@ func TestServer_PackObjectsHookWithSidechannel(t *testing.T) {
testhelper.NewFeatureSets(
featureflag.PackObjectsLimitingUser,
featureflag.PackObjectsLimitingRepo,
+ featureflag.PackObjectsLimitingRemoteIP,
).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookWithSidechannelWithRuntimeDir,
@@ -583,6 +586,7 @@ func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) {
testhelper.NewFeatureSets(
featureflag.PackObjectsLimitingUser,
featureflag.PackObjectsLimitingRepo,
+ featureflag.PackObjectsLimitingRemoteIP,
).Run(t, runTestsWithRuntimeDir(
t,
testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir,
@@ -662,6 +666,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
testhelper.NewFeatureSets(
featureflag.PackObjectsLimitingUser,
featureflag.PackObjectsLimitingRepo,
+ featureflag.PackObjectsLimitingRemoteIP,
).Run(t, testPackObjectsConcurrency)
}
@@ -676,124 +681,216 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) {
keyType = "repo"
} else if featureflag.PackObjectsLimitingUser.IsEnabled(ctx) {
keyType = "user"
+ } else if featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx) {
+ keyType = "remote_ip"
}
- ticker := helper.NewManualTicker()
- monitor := limithandler.NewPackObjectsConcurrencyMonitor(
- keyType,
- cfg.Prometheus.GRPCLatencyBuckets,
- )
- limiter := limithandler.NewConcurrencyLimiter(
- 1,
- 0,
- func() helper.Ticker { return ticker },
- monitor,
- )
-
- registry := prometheus.NewRegistry()
- registry.MustRegister(monitor)
-
- receivedCh, blockCh := make(chan struct{}), make(chan struct{})
- cfg.SocketPath = runHooksServer(t, cfg, []serverOption{
- withRunPackObjectsFn(func(
- context.Context,
- git.CommandFactory,
- io.Writer,
- *gitalypb.PackObjectsHookWithSidechannelRequest,
- *packObjectsArgs,
- io.Reader,
- string,
- *hookPkg.ConcurrencyTracker,
- ) error {
- receivedCh <- struct{}{}
- <-blockCh
- return nil
- }),
- },
- testserver.WithPackObjectsLimiter(limiter),
- )
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
- ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c")
- require.NoError(t, err)
+ for _, tc := range []struct {
+ desc string
+ requests [2]*gitalypb.PackObjectsHookWithSidechannelRequest
+ shouldLimit bool
+ }{
+ {
+ desc: "never reach limit",
+ requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {
+ GlId: "user-123",
+ RemoteIp: "1.2.3.4",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ {
+ GlId: "user-456",
+ RemoteIp: "1.2.3.5",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "d/e/f",
+ },
+ Args: args,
+ },
+ },
+ shouldLimit: false,
+ },
+ {
+ desc: "normal IP address",
+ requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {
+ GlId: "user-123",
+ RemoteIp: "1.2.3.4",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ {
+ GlId: "user-123",
+ RemoteIp: "1.2.3.4",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ },
+ shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) ||
+ featureflag.PackObjectsLimitingUser.IsEnabled(ctx) ||
+ featureflag.PackObjectsLimitingRemoteIP.IsEnabled(ctx),
+ },
+ {
+ desc: "IPv4 loopback addresses",
+ requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {
+ GlId: "user-123",
+ RemoteIp: "127.0.0.1",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ {
+ GlId: "user-123",
+ RemoteIp: "127.0.0.1",
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ },
+ shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) ||
+ featureflag.PackObjectsLimitingUser.IsEnabled(ctx),
+ },
+ {
+ desc: "IPv6 loopback addresses",
+ requests: [2]*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {
+ GlId: "user-123",
+ RemoteIp: net.IPv6loopback.String(),
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ {
+ GlId: "user-123",
+ RemoteIp: net.IPv6loopback.String(),
+ Repository: &gitalypb.Repository{
+ StorageName: "storage-0",
+ RelativePath: "a/b/c",
+ },
+ Args: args,
+ },
+ },
+ shouldLimit: featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) ||
+ featureflag.PackObjectsLimitingUser.IsEnabled(ctx),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ticker := helper.NewManualTicker()
+ monitor := limithandler.NewPackObjectsConcurrencyMonitor(
+ keyType,
+ cfg.Prometheus.GRPCLatencyBuckets,
+ )
+ limiter := limithandler.NewConcurrencyLimiter(
+ 1,
+ 0,
+ func() helper.Ticker { return ticker },
+ monitor,
+ )
- ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503")
- require.NoError(t, err)
+ registry := prometheus.NewRegistry()
+ registry.MustRegister(monitor)
+
+ receivedCh, blockCh := make(chan struct{}), make(chan struct{})
+ cfg.SocketPath = runHooksServer(t, cfg, []serverOption{
+ withRunPackObjectsFn(func(
+ context.Context,
+ git.CommandFactory,
+ io.Writer,
+ *gitalypb.PackObjectsHookWithSidechannelRequest,
+ *packObjectsArgs,
+ io.Reader,
+ string,
+ *hookPkg.ConcurrencyTracker,
+ ) error {
+ receivedCh <- struct{}{}
+ <-blockCh
+ return nil
+ }),
+ },
+ testserver.WithPackObjectsLimiter(limiter),
+ )
- userID := "user-123"
- req1 := &gitalypb.PackObjectsHookWithSidechannelRequest{
- GlId: userID,
- Repository: &gitalypb.Repository{
- StorageName: "storage-0",
- RelativePath: "a/b/c",
- },
- Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"},
- }
+ ctx1, wt1, err := setupSidechannel(t, ctx, "1dd08961455abf80ef9115f4afdc1c6f968b503c")
+ require.NoError(t, err)
- req2 := &gitalypb.PackObjectsHookWithSidechannelRequest{
- GlId: userID,
- Repository: &gitalypb.Repository{
- StorageName: "storage-0",
- RelativePath: "a/b/c",
- },
- Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"},
- }
+ ctx2, wt2, err := setupSidechannel(t, ctx, "2dd08961455abf80ef9115f4afdc1c6f968b503")
+ require.NoError(t, err)
- client, conn := newHooksClient(t, cfg.SocketPath)
- defer testhelper.MustClose(t, conn)
+ client, conn := newHooksClient(t, cfg.SocketPath)
+ defer testhelper.MustClose(t, conn)
- var wg sync.WaitGroup
- wg.Add(2)
+ var wg sync.WaitGroup
+ wg.Add(2)
- errChan := make(chan error)
+ errChan := make(chan error)
- // We fire off two requests. Since the concurrency limit is set at 1,
- // the first request will make it through the concurrency limiter and
- // get blocked in the function provide to withRunPackObjectsFn() above.
- // The second request will get concurrency limited and will be waiting
- // in the queue.
- // When we call Tick() on the max queue wait ticker, the second request
- // will return with an error.
+ // We fire off two requests. Since the concurrency limit is set at 1,
+ // the first request will make it through the concurrency limiter and
+ // get blocked in the function provide to withRunPackObjectsFn() above.
+ // The second request will get concurrency limited and will be waiting
+ // in the queue.
+ // When we call Tick() on the max queue wait ticker, the second request
+ // will return with an error.
- type call struct {
- ctx context.Context
- req *gitalypb.PackObjectsHookWithSidechannelRequest
- }
+ type call struct {
+ ctx context.Context
+ req *gitalypb.PackObjectsHookWithSidechannelRequest
+ }
- for _, c := range []call{
- {ctx: ctx1, req: req1},
- {ctx: ctx2, req: req2},
- } {
- go func(c call) {
- defer wg.Done()
- _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req)
- if err != nil {
- errChan <- err
+ for _, c := range []call{
+ {ctx: ctx1, req: tc.requests[0]},
+ {ctx: ctx2, req: tc.requests[1]},
+ } {
+ go func(c call) {
+ defer wg.Done()
+ _, err := client.PackObjectsHookWithSidechannel(c.ctx, c.req)
+ if err != nil {
+ errChan <- err
+ }
+ }(c)
}
- }(c)
- }
- if featureflag.PackObjectsLimitingRepo.IsEnabled(ctx) || featureflag.PackObjectsLimitingUser.IsEnabled(ctx) {
- <-receivedCh
+ if tc.shouldLimit {
+ <-receivedCh
- require.NoError(t,
- testutil.GatherAndCompare(registry,
- bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
+ require.NoError(t,
+ testutil.GatherAndCompare(registry,
+ bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_in_progress Gauge of number of concurrent in-progress calls
# TYPE gitaly_pack_objects_in_progress gauge
gitaly_pack_objects_in_progress{type=%q} 1
`, keyType)), "gitaly_pack_objects_in_progress"))
- ticker.Tick()
+ ticker.Tick()
- err := <-errChan
- testhelper.RequireGrpcCode(
- t,
- err,
- codes.ResourceExhausted,
- )
+ err := <-errChan
+ testhelper.RequireGrpcCode(
+ t,
+ err,
+ codes.ResourceExhausted,
+ )
- close(blockCh)
+ close(blockCh)
- expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue
+ expectedMetrics := bytes.NewBufferString(fmt.Sprintf(`# HELP gitaly_pack_objects_dropped_total Number of requests dropped from the queue
# TYPE gitaly_pack_objects_dropped_total counter
gitaly_pack_objects_dropped_total{reason="max_time", type=%q} 1
# HELP gitaly_pack_objects_queued Gauge of number of queued calls
@@ -801,24 +898,26 @@ gitaly_pack_objects_dropped_total{reason="max_time", type=%q} 1
gitaly_pack_objects_queued{type=%q} 0
`, keyType, keyType))
- require.NoError(t,
- testutil.GatherAndCompare(registry, expectedMetrics,
- "gitaly_pack_objects_dropped_total",
- "gitaly_pack_objects_queued",
- ))
-
- acquiringSecondsCount, err := testutil.GatherAndCount(registry,
- "gitaly_pack_objects_acquiring_seconds")
- require.NoError(t, err)
+ require.NoError(t,
+ testutil.GatherAndCompare(registry, expectedMetrics,
+ "gitaly_pack_objects_dropped_total",
+ "gitaly_pack_objects_queued",
+ ))
+
+ acquiringSecondsCount, err := testutil.GatherAndCount(registry,
+ "gitaly_pack_objects_acquiring_seconds")
+ require.NoError(t, err)
+
+ require.Equal(t, 1, acquiringSecondsCount)
+ } else {
+ close(blockCh)
+ <-receivedCh
+ <-receivedCh
+ }
- require.Equal(t, 1, acquiringSecondsCount)
- } else {
- close(blockCh)
- <-receivedCh
- <-receivedCh
+ wg.Wait()
+ require.NoError(t, wt1.Wait())
+ require.NoError(t, wt2.Wait())
+ })
}
-
- wg.Wait()
- require.NoError(t, wt1.Wait())
- require.NoError(t, wt2.Wait())
}
diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go b/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go
new file mode 100644
index 000000000..e4d9a5c1e
--- /dev/null
+++ b/internal/metadata/featureflag/ff_pack_objects_limiting_remote_ip.go
@@ -0,0 +1,10 @@
+package featureflag
+
+// PackObjectsLimitingRemoteIP will enable a concurrency limiter for pack objects
+// based off of the remote IP
+var PackObjectsLimitingRemoteIP = NewFeatureFlag(
+ "pack_objects_limiting_remote_ip",
+ "v15.11.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4413",
+ false,
+)
diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go
index 559c184e4..a07874c4c 100644
--- a/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go
+++ b/internal/metadata/featureflag/ff_pack_objects_limiting_repo.go
@@ -4,7 +4,7 @@ package featureflag
// based off of the repository.
var PackObjectsLimitingRepo = NewFeatureFlag(
"pack_objects_limiting_repo",
- "v15.6.0",
+ "v15.11.0",
"https://gitlab.com/gitlab-org/gitaly/-/issues/4413",
false,
)
diff --git a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go
index cc7535d5f..1e8319ad1 100644
--- a/internal/metadata/featureflag/ff_pack_objects_limiting_user.go
+++ b/internal/metadata/featureflag/ff_pack_objects_limiting_user.go
@@ -4,7 +4,7 @@ package featureflag
// based off of the user
var PackObjectsLimitingUser = NewFeatureFlag(
"pack_objects_limiting_user",
- "v15.6.0",
+ "v15.11.0",
"https://gitlab.com/gitlab-org/gitaly/-/issues/4413",
false,
)