diff options
author | John Cai <jcai@gitlab.com> | 2023-06-02 00:56:16 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2023-06-02 00:56:16 +0300 |
commit | 854f25045b1a755a93bfdeb5e9074d8d714f6ebf (patch) | |
tree | ddba558a58c0708dd076511ad38ce786b31e5b84 | |
parent | 3a877b8f3ad9fec41547bdbc27f207c27e1b5e07 (diff) | |
parent | 1f68b9291d86d7969f94f6ce713cb975a5325264 (diff) |
Merge branch 'wc/backport-5716' into '15-11-stable'
Backport 'Remove uncessary fields from pack-objects cache key computation' to 15.11
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5868
Merged-by: John Cai <jcai@gitlab.com>
Approved-by: John Cai <jcai@gitlab.com>
Co-authored-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 50 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects_test.go | 272 |
2 files changed, 234 insertions, 88 deletions
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 27bfc39b8..b040db9f1 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -47,17 +47,7 @@ var ( ) func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { - data, err := protojson.Marshal(req) - if err != nil { - return err - } - - h := sha256.New() - if _, err := h.Write(data); err != nil { - return err - } - - stdin, err := bufferStdin(stdinReader, h) + cacheKey, stdin, err := s.computeCacheKey(req, stdinReader) if err != nil { return err } @@ -74,9 +64,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH } }() - key := hex.EncodeToString(h.Sum(nil)) - - servedBytes, created, err := s.packObjectsCache.Fetch(ctx, key, output, func(w io.Writer) error { + servedBytes, created, err := s.packObjectsCache.Fetch(ctx, cacheKey, output, func(w io.Writer) error { // TODO: We now have three different concurrency strategies for pack-objects. All of // them are guarded behind feature flags. This situation is because we wanted to verify // and pick the most effective strategy. `PackObjectsLimiting.Key` config is ignored @@ -90,7 +78,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH req, args, stdin, - key, + cacheKey, ) } @@ -102,7 +90,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH req, args, stdin, - key, + cacheKey, ) } @@ -117,12 +105,12 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH req, args, stdin, - key, + cacheKey, ) } } - return s.runPackObjects(ctx, w, req, args, stdin, key) + return s.runPackObjects(ctx, w, req, args, stdin, cacheKey) }) if err != nil { return err @@ -136,7 +124,7 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH } ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ - "cache_key": key, + "cache_key": cacheKey, "bytes": servedBytes, }).Info("served bytes") packObjectsServedBytes.Add(float64(servedBytes)) @@ -144,6 +132,30 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH return nil } +// computeCacheKey returns the cache key used for caching pack-objects. A cache key is made up of +// both the requested objects and essential parameters that could impact the content of the +// generated packfile. Including any insignificant information could result in a lower cache hit rate. +func (s *server) computeCacheKey(req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) { + cacheHash := sha256.New() + cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{ + Repository: req.Repository, + Args: req.Args, + GitProtocol: req.GitProtocol, + }) + if err != nil { + return "", nil, err + } + if _, err := cacheHash.Write(cacheKeyPrefix); err != nil { + return "", nil, err + } + stdin, err := bufferStdin(stdinReader, cacheHash) + if err != nil { + return "", nil, err + } + cacheKey := hex.EncodeToString(cacheHash.Sum(nil)) + return cacheKey, stdin, nil +} + func (s *server) runPackObjects( ctx context.Context, w io.Writer, diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index d05de7458..52ff76fe6 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -50,10 +50,11 @@ func runTestsWithRuntimeDir(t *testing.T, testFunc func(*testing.T, context.Cont } } -func cfgWithCache(t *testing.T) config.Cfg { +func cfgWithCache(t *testing.T, minOccurrences int) config.Cfg { cfg := testcfg.Build(t) cfg.PackObjectsCache.Enabled = true cfg.PackObjectsCache.Dir = testhelper.TempDir(t) + cfg.PackObjectsCache.MinOccurrences = minOccurrences return cfg } @@ -99,7 +100,7 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) { } func testServerPackObjectsHookSeparateContextWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { - cfg := cfgWithCache(t) + cfg := cfgWithCache(t, 0) cfg.SocketPath = runHooksServer(t, cfg, nil) ctx1, cancel := context.WithCancel(ctx) @@ -232,78 +233,211 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) { } func testServerPackObjectsHookUsesCache(t *testing.T, ctx context.Context, runtimeDir string) { - cfg := cfgWithCache(t) - - tlc := &streamcache.TestLoggingCache{} - cfg.SocketPath = runHooksServer(t, cfg, []serverOption{func(s *server) { - tlc.Cache = s.packObjectsCache - s.packObjectsCache = tlc - }}) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - Seed: gittest.SeedGitLabTest, - }) - - doRequest := func() { - var stdout []byte - ctx, wt, err := hookPkg.SetupSidechannel( - ctx, - git.HooksPayload{ - RuntimeDir: runtimeDir, + testCases := []struct { + name string + makeRequests func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest + // shouldUseCacheOf[i] contains the position of request that generates the cache for + // i-th request. shouldUseCacheOf[i] == -1 means i-th request does not use cache, thus + // create packfile + shouldUseCacheOf []int + minOccurrences int + }{ + { + name: "all requests are identical", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + } }, - func(c *net.UnixConn) error { - if _, err := io.WriteString(c, "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n"); err != nil { - return err + shouldUseCacheOf: []int{-1, 0, 0, 0, 0}, + }, + { + name: "requests with different protocols", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args, GitProtocol: "ssh"}, + {Repository: repository, Args: args, GitProtocol: "ssh"}, + {Repository: repository, Args: args, GitProtocol: "http"}, + {Repository: repository, Args: args, GitProtocol: "http"}, + {Repository: repository, Args: args, GitProtocol: "ssh"}, } - if err := c.CloseWrite(); err != nil { - return err + }, + shouldUseCacheOf: []int{-1, 0, -1, 2, 0}, + }, + { + name: "requests with slightly different args", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args1 := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress"} + args2 := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args1}, + {Repository: repository, Args: args1}, + {Repository: repository, Args: args1}, + {Repository: repository, Args: args2}, + {Repository: repository, Args: args2}, } - - return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { - if band == 1 { - stdout = append(stdout, data...) - } - return nil - }) }, - ) - require.NoError(t, err) - defer testhelper.MustClose(t, wt) + shouldUseCacheOf: []int{-1, 0, 0, -1, 4}, + }, + { + name: "requests from different remote IPs", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args, RemoteIp: "1.2.3.4"}, + {Repository: repository, Args: args, RemoteIp: "1.2.3.5"}, + {Repository: repository, Args: args, RemoteIp: "1.2.3.4"}, + {Repository: repository, Args: args, RemoteIp: "1.2.3.4"}, + {Repository: repository, Args: args, RemoteIp: "1.2.3.5"}, + } + }, + // All from cached + shouldUseCacheOf: []int{-1, 0, 0, 0, 0}, + }, + { + name: "requests from different user IDs", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args, GlId: "1"}, + {Repository: repository, Args: args, GlId: "1"}, + {Repository: repository, Args: args, GlId: "1"}, + {Repository: repository, Args: args, GlId: "2"}, + {Repository: repository, Args: args, GlId: "3"}, + } + }, + // All from cached + shouldUseCacheOf: []int{-1, 0, 0, 0, 0}, + }, + { + name: "requests from different usernames", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args, GlId: "user-1"}, + {Repository: repository, Args: args, GlId: "user-1"}, + {Repository: repository, Args: args, GlId: "user-1"}, + {Repository: repository, Args: args, GlId: "user-2"}, + {Repository: repository, Args: args, GlId: "user-3"}, + } + }, + // All from cached + shouldUseCacheOf: []int{-1, 0, 0, 0, 0}, + }, + { + name: "min_occurrences setting is set to 1", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + } + }, + // The second one starts to serve cache + shouldUseCacheOf: []int{-1, -1, 0, 0, 0}, + minOccurrences: 1, + }, + { + name: "min_occurrences setting is set to 5", + makeRequests: func(repository *gitalypb.Repository) []*gitalypb.PackObjectsHookWithSidechannelRequest { + args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} + return []*gitalypb.PackObjectsHookWithSidechannelRequest{ + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + {Repository: repository, Args: args}, + } + }, + shouldUseCacheOf: []int{-1, -1, -1, -1, -1}, + minOccurrences: 5, + }, + } - client, conn := newHooksClient(t, cfg.SocketPath) - defer conn.Close() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := cfgWithCache(t, tc.minOccurrences) - _, err = client.PackObjectsHookWithSidechannel(ctx, &gitalypb.PackObjectsHookWithSidechannelRequest{ - Repository: repo, - Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, - }) - require.NoError(t, err) - require.NoError(t, wt.Wait()) - - gittest.ExecOpts( - t, - cfg, - gittest.ExecConfig{Stdin: bytes.NewReader(stdout)}, - "-C", repoPath, "index-pack", "--stdin", "--fix-thin", - ) - } + tlc := &streamcache.TestLoggingCache{} + cfg.SocketPath = runHooksServer(t, cfg, []serverOption{func(s *server) { + tlc.Cache = s.packObjectsCache + s.packObjectsCache = tlc + }}) - const N = 5 - for i := 0; i < N; i++ { - doRequest() - } + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + Seed: gittest.SeedGitLabTest, + }) + + doRequest := func(request *gitalypb.PackObjectsHookWithSidechannelRequest) { + var stdout []byte + ctx, wt, err := hookPkg.SetupSidechannel( + ctx, + git.HooksPayload{ + RuntimeDir: runtimeDir, + }, + func(c *net.UnixConn) error { + if _, err := io.WriteString(c, "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n"); err != nil { + return err + } + if err := c.CloseWrite(); err != nil { + return err + } + + return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { + if band == 1 { + stdout = append(stdout, data...) + } + return nil + }) + }, + ) + require.NoError(t, err) + defer testhelper.MustClose(t, wt) - entries := tlc.Entries() - require.Len(t, entries, N) - first := entries[0] - require.NotEmpty(t, first.Key) - require.True(t, first.Created) - require.NoError(t, first.Err) - - for i := 1; i < N; i++ { - require.Equal(t, first.Key, entries[i].Key, "all requests had the same cache key") - require.False(t, entries[i].Created, "all requests except the first were cache hits") - require.NoError(t, entries[i].Err) + client, conn := newHooksClient(t, cfg.SocketPath) + defer conn.Close() + + _, err = client.PackObjectsHookWithSidechannel(ctx, request) + require.NoError(t, err) + require.NoError(t, wt.Wait()) + + gittest.ExecOpts( + t, + cfg, + gittest.ExecConfig{Stdin: bytes.NewReader(stdout)}, + "-C", repoPath, "index-pack", "--stdin", "--fix-thin", + ) + } + + requests := tc.makeRequests(repo) + for _, request := range requests { + doRequest(request) + } + + entries := tlc.Entries() + require.Len(t, entries, len(requests)) + + for i := 0; i < len(requests); i++ { + require.NoError(t, entries[i].Err) + + if tc.shouldUseCacheOf[i] == -1 { + require.True(t, entries[i].Created, "request %d should create packfile", i) + } else { + require.False(t, entries[i].Created, "request %d should not create packfile", i) + entryOfCache := entries[tc.shouldUseCacheOf[i]] + require.Equal(t, entryOfCache.Key, entries[i].Key, "request %d does not cache key from request %d", i, tc.shouldUseCacheOf[i]) + } + } + }) } } @@ -340,7 +474,7 @@ func testServerPackObjectsHookWithSidechannelWithRuntimeDir(t *testing.T, ctx co for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - cfg := cfgWithCache(t) + cfg := cfgWithCache(t, 0) logger, hook := test.NewNullLogger() concurrencyTracker := hookPkg.NewConcurrencyTracker() @@ -594,7 +728,7 @@ func TestServer_PackObjectsHookWithSidechannel_Canceled(t *testing.T) { } func testServerPackObjectsHookWithSidechannelCanceledWithRuntimeDir(t *testing.T, ctx context.Context, runtimeDir string) { - cfg := cfgWithCache(t) + cfg := cfgWithCache(t, 0) ctx, wt, err := hookPkg.SetupSidechannel( ctx, @@ -673,7 +807,7 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { t.Parallel() - cfg := cfgWithCache(t) + cfg := cfgWithCache(t, 0) var keyType string |