diff options
author | John Cai <jcai@gitlab.com> | 2022-06-02 17:49:57 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-07-13 19:00:04 +0300 |
commit | d4bcd755235bed04123f5e4fafa7814788d9ad2c (patch) | |
tree | 410c95c5acc62b48bf9f3a9955af25da31b2b106 | |
parent | 6537b017a11b21e2993a24f46bfd2475382c37bd (diff) |
hook service: Call ConcurrencyTrackerjc-track-concurrency-pack-objects
In order to gain insight into concurrent calls of pack objects, call the
ConcurrencyTracker each time we spawn a pack objects process. Since we
want to know both how many concurrent pack object processes are spawned
per user as well as per repository, we have a call for each.
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 35 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 136 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_pack_objects_metrics.go | 9 |
3 files changed, 153 insertions, 27 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index f71db9d2c..3d161a521 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -22,10 +22,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/stream" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" ) var ( @@ -43,8 +43,8 @@ var ( }) ) -func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { - data, err := protojson.Marshal(reqHash) +func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { + data, err := protojson.Marshal(req) if err != nil { return err } @@ -74,7 +74,7 @@ func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, key := hex.EncodeToString(h.Sum(nil)) r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error { - return s.runPackObjects(ctx, w, repo, args, stdin, key) + return s.runPackObjects(ctx, w, req, args, stdin, key) }) if err != nil { return err @@ -105,7 +105,14 @@ func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, return r.Wait(ctx) } -func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb.Repository, args *packObjectsArgs, stdin io.ReadCloser, key string) error { +func (s *server) runPackObjects( + ctx context.Context, + w io.Writer, + req *gitalypb.PackObjectsHookWithSidechannelRequest, + args *packObjectsArgs, + stdin io.ReadCloser, + key string, +) error { // We want to keep the context for logging, but we want to block all its // cancelation signals (deadline, cancel etc.). This is because of // the following scenario. Imagine client1 calls PackObjectsHook and @@ -124,6 +131,22 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb defer stdin.Close() + repo := req.GetRepository() + + if featureflag.PackObjectsMetrics.IsEnabled(ctx) && s.concurrencyTracker != nil { + finishRepoLog := s.concurrencyTracker.LogConcurrency(ctx, "repository", repo.GetRelativePath()) + defer finishRepoLog() + + userID := req.GetGlId() + + if userID == "" { + userID = "none" + } + + finishUserLog := s.concurrencyTracker.LogConcurrency(ctx, "user_id", userID) + defer finishUserLog() + } + counter := &helper.CountingWriter{W: w} sw := pktline.NewSidebandWriter(counter) stdout := bufio.NewWriterSize(sw.Writer(stream.BandStdout), pktline.MaxSidebandData) @@ -285,7 +308,7 @@ func (s *server) PackObjectsHookWithSidechannel(ctx context.Context, req *gitaly } defer c.Close() - if err := s.packObjectsHook(ctx, req.Repository, req, args, c, c); err != nil { + if err := s.packObjectsHook(ctx, req, args, c, c); err != nil { if errors.Is(err, syscall.EPIPE) { // EPIPE is the error we get if we try to write to c after the client has // closed its side of the connection. By convention, we label server side diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 73c98c2d9..c11092806 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -2,12 +2,14 @@ package hook import ( "bytes" + "context" "io" "net" "strings" "sync" "testing" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" @@ -16,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" hookPkg "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" @@ -24,16 +27,18 @@ import ( "google.golang.org/grpc/codes" ) -func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, string)) { +func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, context.Context, string)) func(*testing.T, context.Context) { t.Helper() - t.Run("no runtime dir", func(t *testing.T) { - testFunc(t, "") - }) + return func(t *testing.T, ctx context.Context) { + t.Run("no runtime dir", func(t *testing.T) { + testFunc(t, ctx, "") + }) - t.Run("with runtime dir", func(t *testing.T) { - testFunc(t, testhelper.TempDir(t)) - }) + t.Run("with runtime dir", func(t *testing.T) { + testFunc(t, ctx, testhelper.TempDir(t)) + }) + } } func cfgWithCache(t *testing.T) config.Cfg { @@ -73,14 +78,18 @@ func TestParsePackObjectsArgs(t *testing.T) { func TestServer_PackObjectsHook_separateContext(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir) + testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run( + t, + runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir), + ) } -func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtimeDir string) { +func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { cfg := cfgWithCache(t) cfg.SocketPath = runHooksServer(t, cfg, nil) - ctx1 := testhelper.Context(t) + ctx1, cancel := context.WithCancel(ctx) + defer cancel() repo, repoPath := gittest.CreateRepository(ctx1, t, cfg, gittest.CreateRepositoryConfig{ Seed: gittest.SeedGitLabTest, }) @@ -137,7 +146,8 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim client2, conn2 := newHooksClient(t, cfg.SocketPath) defer conn2.Close() - ctx2 := testhelper.Context(t) + ctx2, cancel := context.WithCancel(ctx) + defer cancel() var stdout2 []byte ctx2, wt2, err := hookPkg.SetupSidechannel( @@ -186,10 +196,13 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim func TestServer_PackObjectsHook_usesCache(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir(t, testServerPackObjectsHookUsesCache) + testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run( + t, + runTestsWithRuntimeDir(t, testServerPackObjectsHookUsesCache), + ) } -func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) { +func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) { cfg := cfgWithCache(t) tlc := &streamcache.TestLoggingCache{} @@ -198,7 +211,6 @@ func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) { s.packObjectsCache = tlc }}) - ctx := testhelper.Context(t) repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ Seed: gittest.SeedGitLabTest, }) @@ -268,10 +280,14 @@ func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) { func TestServer_PackObjectsHookWithSidechannel(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelWithRuntimeDir) + + testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run( + t, + runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelWithRuntimeDir), + ) } -func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtimeDir string) { +func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { testCases := []struct { desc string stdin string @@ -292,10 +308,17 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtim for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { cfg := cfgWithCache(t) - ctx := testhelper.Context(t) logger, hook := test.NewNullLogger() - cfg.SocketPath = runHooksServer(t, cfg, nil, testserver.WithLogger(logger)) + concurrencyTracker := hookPkg.NewConcurrencyTracker() + + cfg.SocketPath = runHooksServer( + t, + cfg, + nil, + testserver.WithLogger(logger), + testserver.WithConcurrencyTracker(concurrencyTracker), + ) repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{ Seed: gittest.SeedGitLabTest, }) @@ -384,6 +407,74 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtim require.True(t, strings.HasPrefix(total, "Total ")) require.False(t, strings.Contains(total, "\n")) }) + + if featureflag.PackObjectsMetrics.IsEnabled(ctx) { + expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes +# TYPE gitaly_pack_objects_concurrent_processes histogram +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="0"} 0 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="5"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="10"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="15"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="20"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="25"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="30"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="35"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="40"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="45"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="50"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="55"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="60"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="65"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="70"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="75"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="80"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="85"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="90"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="95"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="+Inf"} 1 +gitaly_pack_objects_concurrent_processes_sum{segment="repository"} 1 +gitaly_pack_objects_concurrent_processes_count{segment="repository"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="0"} 0 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="5"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="10"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="15"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="20"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="25"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="30"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="35"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="40"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="45"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="50"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="55"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="60"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="65"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="70"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="75"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="80"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="85"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="90"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="95"} 1 +gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="+Inf"} 1 +gitaly_pack_objects_concurrent_processes_sum{segment="user_id"} 1 +gitaly_pack_objects_concurrent_processes_count{segment="user_id"} 1 +# HELP gitaly_pack_objects_process_active_callers Number of unique callers that have an active pack objects processes +# TYPE gitaly_pack_objects_process_active_callers gauge +gitaly_pack_objects_process_active_callers{segment="repository"} 0 +gitaly_pack_objects_process_active_callers{segment="user_id"} 0 +# HELP gitaly_pack_objects_process_active_callers_total Total unique callers that have initiated a pack objects processes +# TYPE gitaly_pack_objects_process_active_callers_total counter +gitaly_pack_objects_process_active_callers_total{segment="repository"} 1 +gitaly_pack_objects_process_active_callers_total{segment="user_id"} 1 +` + require.NoError(t, testutil.CollectAndCompare( + concurrencyTracker, + bytes.NewBufferString(expectedMetrics), + )) + } else { + require.Equal(t, 0, testutil.CollectAndCount( + concurrencyTracker, + )) + } }) } } @@ -430,12 +521,15 @@ func TestServer_PackObjectsHookWithSidechannel_invalidArgument(t *testing.T) { func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) { t.Parallel() - runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir) + + testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run( + t, + runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir), + ) } -func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, runtimeDir string) { +func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { cfg := cfgWithCache(t) - ctx := testhelper.Context(t) ctx, wt, err := hookPkg.SetupSidechannel( ctx, diff --git a/internal/metadata/featureflag/ff_pack_objects_metrics.go b/internal/metadata/featureflag/ff_pack_objects_metrics.go new file mode 100644 index 000000000..7b8ee665a --- /dev/null +++ b/internal/metadata/featureflag/ff_pack_objects_metrics.go @@ -0,0 +1,9 @@ +package featureflag + +// PackObjectsMetrics turns on metrics for pack objects +var PackObjectsMetrics = NewFeatureFlag( + "pack_objects_metrics", + "v15.2.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4336", + false, +) |