Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2023-05-04 08:05:49 +0300
committerGitLab Bot <gitlab-bot@gitlab.com>2023-05-04 08:05:49 +0300
commit8dd841dc873f193d7b917bb2151acfc292c307d7 (patch)
treec35070f57c680d1a5b941aae9f601ae65269c956
parentb0838ee4f37f6f53b92ddbe08c0862f44ad9f8d3 (diff)
parent1707c7b7772461837c83165f6bc73e1c72f542a6 (diff)
Automatic merge of gitlab-org/gitaly master
-rw-r--r--internal/gitaly/service/hook/pack_objects.go50
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go272
2 files changed, 234 insertions, 88 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index 6d2f1dc4c..32b209f67 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -47,17 +47,7 @@ var (
)
func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error {
- data, err := protojson.Marshal(req)
- if err != nil {
- return err
- }
-
- h := sha256.New()
- if _, err := h.Write(data); err != nil {
- return err
- }
-
- stdin, err := bufferStdin(stdinReader, h)
+ cacheKey, stdin, err := s.computeCacheKey(req, stdinReader)
if err != nil {
return err
}
@@ -74,9 +64,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
}
}()
- key := hex.EncodeToString(h.Sum(nil))
-
- servedBytes, created, err := s.packObjectsCache.Fetch(ctx, key, output, func(w io.Writer) error {
+ servedBytes, created, err := s.packObjectsCache.Fetch(ctx, cacheKey, output, func(w io.Writer) error {
// TODO: We now have three different concurrency strategies for pack-objects. All of
// them are guarded behind feature flags. This situation is because we wanted to verify
// and pick the most effective strategy. `PackObjectsLimiting.Key` config is ignored
@@ -90,7 +78,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
req,
args,
stdin,
- key,
+ cacheKey,
)
}
@@ -102,7 +90,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
req,
args,
stdin,
- key,
+ cacheKey,
)
}
@@ -123,12 +111,12 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
req,
args,
stdin,
- key,
+ cacheKey,
)
}
}
- return s.runPackObjects(ctx, w, req, args, stdin, key)
+ return s.runPackObjects(ctx, w, req, args, stdin, cacheKey)
})
if err != nil {
return err
@@ -142,7 +130,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
}
ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
- "cache_key": key,
+ "cache_key": cacheKey,
"bytes": servedBytes,
}).Info("served bytes")
packObjectsServedBytes.Add(float64(servedBytes))
@@ -150,6 +138,30 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH
return nil
}
+// computeCacheKey returns the cache key used for caching pack-objects. A cache key is made up of
+// both the requested objects and essential parameters that could impact the content of the
+// generated packfile. Including any insignificant information could result in a lower cache hit rate.
+func (s *server) computeCacheKey(req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) {
+ cacheHash := sha256.New()
+ cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{
+ Repository: req.Repository,
+ Args: req.Args,
+ GitProtocol: req.GitProtocol,
+ })
+ if err != nil {
+ return "", nil, err
+ }
+ if _, err := cacheHash.Write(cacheKeyPrefix); err != nil {
+ return "", nil, err
+ }
+ stdin, err := bufferStdin(stdinReader, cacheHash)
+ if err != nil {
+ return "", nil, err
+ }
+ cacheKey := hex.EncodeToString(cacheHash.Sum(nil))
+ return cacheKey, stdin, nil
+}
+
func (s *server) runPackObjects(
ctx context.Context,
w io.Writer,
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index ae4b9d992..0545d50f7 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -50,10 +50,11 @@ func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, context.Cont
}
}
-func cfgWithCache(t *testing.T) config.Cfg {
+func cfgWithCache(t *testing.T, minOccurrences int) config.Cfg {
cfg := testcfg.Build(t)
cfg.PackObjectsCache.Enabled = true
cfg.PackObjectsCache.Dir = testhelper.TempDir(t)
+ cfg.PackObjectsCache.MinOccurrences = minOccurrences
return cfg
}
@@ -99,7 +100,7 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) {
}
func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
- cfg := cfgWithCache(t)
+ cfg := cfgWithCache(t, 0)
cfg.SocketPath = runHooksServer(t, cfg, nil)
ctx1, cancel := context.WithCancel(ctx)
@@ -232,78 +233,211 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) {
}
func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) {
- cfg := cfgWithCache(t)
-
- tlc := &streamcache.TestLoggingCache{}
- cfg.SocketPath = runHooksServer(t, cfg, []serverOption{func(s *server) {
- tlc.Cache = s.packObjectsCache
- s.packObjectsCache = tlc
- }})
-
- repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- Seed: gittest.SeedGitLabTest,
- })
-
- doRequest := func() {
- var stdout []byte
- ctx, wt, err := hookPkg.SetupSidechannel(
- ctx,
- git.HooksPayload{
- RuntimeDir: runtimeDir,
+ testCases := []struct {
+ name string
+ makeRequests func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest
+ // shouldUseCacheOf[i] contains the position of request that generates the cache for
+ // i-th request. shouldUseCacheOf[i] == -1 means i-th request does not use cache, thus
+ // create packfile
+ shouldUseCacheOf []int
+ minOccurrences int
+ }{
+ {
+ name: "all requests are identical",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ }
},
- func(c *net.UnixConn) error {
- if _, err := io.WriteString(c, "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n"); err != nil {
- return err
+ shouldUseCacheOf: []int{-1, 0, 0, 0, 0},
+ },
+ {
+ name: "requests with different protocols",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args, GitProtocol: "ssh"},
+ {Repository: repository, Args: args, GitProtocol: "ssh"},
+ {Repository: repository, Args: args, GitProtocol: "http"},
+ {Repository: repository, Args: args, GitProtocol: "http"},
+ {Repository: repository, Args: args, GitProtocol: "ssh"},
}
- if err := c.CloseWrite(); err != nil {
- return err
+ },
+ shouldUseCacheOf: []int{-1, 0, -1, 2, 0},
+ },
+ {
+ name: "requests with slightly different args",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args1 := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress"}
+ args2 := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args1},
+ {Repository: repository, Args: args1},
+ {Repository: repository, Args: args1},
+ {Repository: repository, Args: args2},
+ {Repository: repository, Args: args2},
}
-
- return pktline.EachSidebandPacket(c, func(band byte, data []byte) error {
- if band == 1 {
- stdout = append(stdout, data...)
- }
- return nil
- })
},
- )
- require.NoError(t, err)
- defer testhelper.MustClose(t, wt)
+ shouldUseCacheOf: []int{-1, 0, 0, -1, 4},
+ },
+ {
+ name: "requests from different remote IPs",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args, RemoteIp: "1.2.3.4"},
+ {Repository: repository, Args: args, RemoteIp: "1.2.3.5"},
+ {Repository: repository, Args: args, RemoteIp: "1.2.3.4"},
+ {Repository: repository, Args: args, RemoteIp: "1.2.3.4"},
+ {Repository: repository, Args: args, RemoteIp: "1.2.3.5"},
+ }
+ },
+ // All from cached
+ shouldUseCacheOf: []int{-1, 0, 0, 0, 0},
+ },
+ {
+ name: "requests from different user IDs",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args, GlId: "1"},
+ {Repository: repository, Args: args, GlId: "1"},
+ {Repository: repository, Args: args, GlId: "1"},
+ {Repository: repository, Args: args, GlId: "2"},
+ {Repository: repository, Args: args, GlId: "3"},
+ }
+ },
+ // All from cached
+ shouldUseCacheOf: []int{-1, 0, 0, 0, 0},
+ },
+ {
+ name: "requests from different usernames",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args, GlId: "user-1"},
+ {Repository: repository, Args: args, GlId: "user-1"},
+ {Repository: repository, Args: args, GlId: "user-1"},
+ {Repository: repository, Args: args, GlId: "user-2"},
+ {Repository: repository, Args: args, GlId: "user-3"},
+ }
+ },
+ // All from cached
+ shouldUseCacheOf: []int{-1, 0, 0, 0, 0},
+ },
+ {
+ name: "min_occurrences setting is set to 1",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ }
+ },
+ // The second one starts to serve cache
+ shouldUseCacheOf: []int{-1, -1, 0, 0, 0},
+ minOccurrences: 1,
+ },
+ {
+ name: "min_occurrences setting is set to 5",
+ makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest {
+ args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}
+ return []*gitalypb.PackObjectsHookWithSidechannelRequest{
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ {Repository: repository, Args: args},
+ }
+ },
+ shouldUseCacheOf: []int{-1, -1, -1, -1, -1},
+ minOccurrences: 5,
+ },
+ }
- client, conn := newHooksClient(t, cfg.SocketPath)
- defer conn.Close()
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ cfg := cfgWithCache(t, tc.minOccurrences)
- _, err = client.PackObjectsHookWithSidechannel(ctx, &gitalypb.PackObjectsHookWithSidechannelRequest{
- Repository: repo,
- Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"},
- })
- require.NoError(t, err)
- require.NoError(t, wt.Wait())
-
- gittest.ExecOpts(
- t,
- cfg,
- gittest.ExecConfig{Stdin: bytes.NewReader(stdout)},
- "-C", repoPath, "index-pack", "--stdin", "--fix-thin",
- )
- }
+ tlc := &streamcache.TestLoggingCache{}
+ cfg.SocketPath = runHooksServer(t, cfg, []serverOption{func(s *server) {
+ tlc.Cache = s.packObjectsCache
+ s.packObjectsCache = tlc
+ }})
- const N = 5
- for i := 0; i < N; i++ {
- doRequest()
- }
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ Seed: gittest.SeedGitLabTest,
+ })
+
+ doRequest := func(request *gitalypb.PackObjectsHookWithSidechannelRequest) {
+ var stdout []byte
+ ctx, wt, err := hookPkg.SetupSidechannel(
+ ctx,
+ git.HooksPayload{
+ RuntimeDir: runtimeDir,
+ },
+ func(c *net.UnixConn) error {
+ if _, err := io.WriteString(c, "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n"); err != nil {
+ return err
+ }
+ if err := c.CloseWrite(); err != nil {
+ return err
+ }
+
+ return pktline.EachSidebandPacket(c, func(band byte, data []byte) error {
+ if band == 1 {
+ stdout = append(stdout, data...)
+ }
+ return nil
+ })
+ },
+ )
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, wt)
- entries := tlc.Entries()
- require.Len(t, entries, N)
- first := entries[0]
- require.NotEmpty(t, first.Key)
- require.True(t, first.Created)
- require.NoError(t, first.Err)
-
- for i := 1; i < N; i++ {
- require.Equal(t, first.Key, entries[i].Key, "all requests had the same cache key")
- require.False(t, entries[i].Created, "all requests except the first were cache hits")
- require.NoError(t, entries[i].Err)
+ client, conn := newHooksClient(t, cfg.SocketPath)
+ defer conn.Close()
+
+ _, err = client.PackObjectsHookWithSidechannel(ctx, request)
+ require.NoError(t, err)
+ require.NoError(t, wt.Wait())
+
+ gittest.ExecOpts(
+ t,
+ cfg,
+ gittest.ExecConfig{Stdin: bytes.NewReader(stdout)},
+ "-C", repoPath, "index-pack", "--stdin", "--fix-thin",
+ )
+ }
+
+ requests := tc.makeRequests(repo)
+ for _, request := range requests {
+ doRequest(request)
+ }
+
+ entries := tlc.Entries()
+ require.Len(t, entries, len(requests))
+
+ for i := 0; i < len(requests); i++ {
+ require.NoError(t, entries[i].Err)
+
+ if tc.shouldUseCacheOf[i] == -1 {
+ require.True(t, entries[i].Created, "request %d should create packfile", i)
+ } else {
+ require.False(t, entries[i].Created, "request %d should not create packfile", i)
+ entryOfCache := entries[tc.shouldUseCacheOf[i]]
+ require.Equal(t, entryOfCache.Key, entries[i].Key, "request %d does not cache key from request %d", i, tc.shouldUseCacheOf[i])
+ }
+ }
+ })
}
}
@@ -340,7 +474,7 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx co
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- cfg := cfgWithCache(t)
+ cfg := cfgWithCache(t, 0)
logger, hook := test.NewNullLogger()
concurrencyTracker := hookPkg.NewConcurrencyTracker()
@@ -594,7 +728,7 @@ func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) {
}
func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) {
- cfg := cfgWithCache(t)
+ cfg := cfgWithCache(t, 0)
ctx, wt, err := hookPkg.SetupSidechannel(
ctx,
@@ -673,7 +807,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) {
func testPackObjectsConcurrency(t *testing.T, ctx context.Context) {
t.Parallel()
- cfg := cfgWithCache(t)
+ cfg := cfgWithCache(t, 0)
var keyType string