Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-06-02 17:49:57 +0300
committerJohn Cai <jcai@gitlab.com>2022-07-13 19:00:04 +0300
commitd4bcd755235bed04123f5e4fafa7814788d9ad2c (patch)
tree410c95c5acc62b48bf9f3a9955af25da31b2b106
parent6537b017a11b21e2993a24f46bfd2475382c37bd (diff)
hook service: Call ConcurrencyTrackerjc-track-concurrency-pack-objects
In order to gain insight into concurrent calls of pack objects, call the ConcurrencyTracker each time we spawn a pack objects process. Since we want to know both how many concurrent pack object processes are spawned per user as well as per repository, we have a call for each.
-rw-r--r--internal/gitaly/service/hook/pack_objects.go35
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go136
-rw-r--r--internal/metadata/featureflag/ff_pack_objects_metrics.go9
3 files changed, 153 insertions, 27 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index f71db9d2c..3d161a521 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -22,10 +22,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/stream"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/protobuf/encoding/protojson"
- "google.golang.org/protobuf/proto"
)
var (
@@ -43,8 +43,8 @@ var (
})
)
-func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error {
- data, err := protojson.Marshal(reqHash)
+func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error {
+ data, err := protojson.Marshal(req)
if err != nil {
return err
}
@@ -74,7 +74,7 @@ func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository,
key := hex.EncodeToString(h.Sum(nil))
r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error {
- return s.runPackObjects(ctx, w, repo, args, stdin, key)
+ return s.runPackObjects(ctx, w, req, args, stdin, key)
})
if err != nil {
return err
@@ -105,7 +105,14 @@ func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository,
return r.Wait(ctx)
}
-func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb.Repository, args *packObjectsArgs, stdin io.ReadCloser, key string) error {
+func (s *server) runPackObjects(
+ ctx context.Context,
+ w io.Writer,
+ req *gitalypb.PackObjectsHookWithSidechannelRequest,
+ args *packObjectsArgs,
+ stdin io.ReadCloser,
+ key string,
+) error {
// We want to keep the context for logging, but we want to block all its
// cancelation signals (deadline, cancel etc.). This is because of
// the following scenario. Imagine client1 calls PackObjectsHook and
@@ -124,6 +131,22 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb
defer stdin.Close()
+ repo := req.GetRepository()
+
+ if featureflag.PackObjectsMetrics.IsEnabled(ctx) && s.concurrencyTracker != nil {
+ finishRepoLog := s.concurrencyTracker.LogConcurrency(ctx, "repository", repo.GetRelativePath())
+ defer finishRepoLog()
+
+ userID := req.GetGlId()
+
+ if userID == "" {
+ userID = "none"
+ }
+
+ finishUserLog := s.concurrencyTracker.LogConcurrency(ctx, "user_id", userID)
+ defer finishUserLog()
+ }
+
counter := &helper.CountingWriter{W: w}
sw := pktline.NewSidebandWriter(counter)
stdout := bufio.NewWriterSize(sw.Writer(stream.BandStdout), pktline.MaxSidebandData)
@@ -285,7 +308,7 @@ func (s *server) PackObjectsHookWithSidechannel(ctx context.Context, req *gitaly
}
defer c.Close()
- if err := s.packObjectsHook(ctx, req.Repository, req, args, c, c); err != nil {
+ if err := s.packObjectsHook(ctx, req, args, c, c); err != nil {
if errors.Is(err, syscall.EPIPE) {
// EPIPE is the error we get if we try to write to c after the client has
// closed its side of the connection. By convention, we label server side
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index 73c98c2d9..c11092806 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -2,12 +2,14 @@ package hook
import (
"bytes"
+ "context"
"io"
"net"
"strings"
"sync"
"testing"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
@@ -16,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
hookPkg "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
@@ -24,16 +27,18 @@ import (
"google.golang.org/grpc/codes"
)
-func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, string)) {
+func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, context.Context, string)) func(*testing.T, context.Context) {
t.Helper()
- t.Run("no runtime dir", func(t *testing.T) {
- testFunc(t, "")
- })
+ return func(t *testing.T, ctx context.Context) {
+ t.Run("no runtime dir", func(t *testing.T) {
+ testFunc(t, ctx, "")
+ })
- t.Run("with runtime dir", func(t *testing.T) {
- testFunc(t, testhelper.TempDir(t))
- })
+ t.Run("with runtime dir", func(t *testing.T) {
+ testFunc(t, ctx, testhelper.TempDir(t))
+ })
+ }
}
func cfgWithCache(t *testing.T) config.Cfg {
@@ -73,14 +78,18 @@ func TestParsePackObjectsArgs(t *testing.T) {
func TestServer_PackObjectsHook_separateContext(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir)
+ testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run(
+ t,
+ runTestsWithRuntimeDir(t, testServerPackObjectsHookSeparateContextWithRuntimeDir),
+ )
}
-func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtimeDir string) {
+func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
cfg := cfgWithCache(t)
cfg.SocketPath = runHooksServer(t, cfg, nil)
- ctx1 := testhelper.Context(t)
+ ctx1, cancel := context.WithCancel(ctx)
+ defer cancel()
repo, repoPath := gittest.CreateRepository(ctx1, t, cfg, gittest.CreateRepositoryConfig{
Seed: gittest.SeedGitLabTest,
})
@@ -137,7 +146,8 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim
client2, conn2 := newHooksClient(t, cfg.SocketPath)
defer conn2.Close()
- ctx2 := testhelper.Context(t)
+ ctx2, cancel := context.WithCancel(ctx)
+ defer cancel()
var stdout2 []byte
ctx2, wt2, err := hookPkg.SetupSidechannel(
@@ -186,10 +196,13 @@ func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, runtim
func TestServer_PackObjectsHook_usesCache(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(t, testServerPackObjectsHookUsesCache)
+ testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run(
+ t,
+ runTestsWithRuntimeDir(t, testServerPackObjectsHookUsesCache),
+ )
}
-func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) {
+func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) {
cfg := cfgWithCache(t)
tlc := &streamcache.TestLoggingCache{}
@@ -198,7 +211,6 @@ func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) {
s.packObjectsCache = tlc
}})
- ctx := testhelper.Context(t)
repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{
Seed: gittest.SeedGitLabTest,
})
@@ -268,10 +280,14 @@ func testServerPackObjectsHookUsesCache(t *testing.T, runtimeDir string) {
func TestServer_PackObjectsHookWithSidechannel(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelWithRuntimeDir)
+
+ testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run(
+ t,
+ runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelWithRuntimeDir),
+ )
}
-func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtimeDir string) {
+func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
testCases := []struct {
desc string
stdin string
@@ -292,10 +308,17 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtim
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
cfg := cfgWithCache(t)
- ctx := testhelper.Context(t)
logger, hook := test.NewNullLogger()
- cfg.SocketPath = runHooksServer(t, cfg, nil, testserver.WithLogger(logger))
+ concurrencyTracker := hookPkg.NewConcurrencyTracker()
+
+ cfg.SocketPath = runHooksServer(
+ t,
+ cfg,
+ nil,
+ testserver.WithLogger(logger),
+ testserver.WithConcurrencyTracker(concurrencyTracker),
+ )
repo, repoPath := gittest.CreateRepository(ctx, t, cfg, gittest.CreateRepositoryConfig{
Seed: gittest.SeedGitLabTest,
})
@@ -384,6 +407,74 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, runtim
require.True(t, strings.HasPrefix(total, "Total "))
require.False(t, strings.Contains(total, "\n"))
})
+
+ if featureflag.PackObjectsMetrics.IsEnabled(ctx) {
+ expectedMetrics := `# HELP gitaly_pack_objects_concurrent_processes Number of concurrent processes
+# TYPE gitaly_pack_objects_concurrent_processes histogram
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="0"} 0
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="5"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="10"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="15"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="20"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="25"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="30"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="35"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="40"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="45"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="50"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="55"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="60"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="65"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="70"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="75"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="80"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="85"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="90"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="95"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="repository",le="+Inf"} 1
+gitaly_pack_objects_concurrent_processes_sum{segment="repository"} 1
+gitaly_pack_objects_concurrent_processes_count{segment="repository"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="0"} 0
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="5"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="10"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="15"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="20"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="25"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="30"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="35"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="40"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="45"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="50"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="55"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="60"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="65"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="70"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="75"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="80"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="85"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="90"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="95"} 1
+gitaly_pack_objects_concurrent_processes_bucket{segment="user_id",le="+Inf"} 1
+gitaly_pack_objects_concurrent_processes_sum{segment="user_id"} 1
+gitaly_pack_objects_concurrent_processes_count{segment="user_id"} 1
+# HELP gitaly_pack_objects_process_active_callers Number of unique callers that have an active pack objects processes
+# TYPE gitaly_pack_objects_process_active_callers gauge
+gitaly_pack_objects_process_active_callers{segment="repository"} 0
+gitaly_pack_objects_process_active_callers{segment="user_id"} 0
+# HELP gitaly_pack_objects_process_active_callers_total Total unique callers that have initiated a pack objects processes
+# TYPE gitaly_pack_objects_process_active_callers_total counter
+gitaly_pack_objects_process_active_callers_total{segment="repository"} 1
+gitaly_pack_objects_process_active_callers_total{segment="user_id"} 1
+`
+ require.NoError(t, testutil.CollectAndCompare(
+ concurrencyTracker,
+ bytes.NewBufferString(expectedMetrics),
+ ))
+ } else {
+ require.Equal(t, 0, testutil.CollectAndCount(
+ concurrencyTracker,
+ ))
+ }
})
}
}
@@ -430,12 +521,15 @@ func TestServer_PackObjectsHookWithSidechannel_invalidArgument(t *testing.T) {
func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) {
t.Parallel()
- runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir)
+
+ testhelper.NewFeatureSets(featureflag.PackObjectsMetrics).Run(
+ t,
+ runTestsWithRuntimeDir(t, testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir),
+ )
}
-func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, runtimeDir string) {
+func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
cfg := cfgWithCache(t)
- ctx := testhelper.Context(t)
ctx, wt, err := hookPkg.SetupSidechannel(
ctx,
diff --git a/internal/metadata/featureflag/ff_pack_objects_metrics.go b/internal/metadata/featureflag/ff_pack_objects_metrics.go
new file mode 100644
index 000000000..7b8ee665a
--- /dev/null
+++ b/internal/metadata/featureflag/ff_pack_objects_metrics.go
@@ -0,0 +1,9 @@
+package featureflag
+
+// PackObjectsMetrics turns on metrics for pack objects
+var PackObjectsMetrics = NewFeatureFlag(
+ "pack_objects_metrics",
+ "v15.2.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4336",
+ false,
+)