diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-02 13:05:24 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-08 17:48:45 +0300 |
commit | 196235a330a3cb155a5d98b3fc071189b7af9acb (patch) | |
tree | 05682f06c9755bc39c4826a91407d339f452a667 | |
parent | fa7c6cd3bcad4e8307558cfc561f9fbf32f01b85 (diff) |
gitpipe: Convert object pipeline to use object reader queue
Convert the object pipeline step to use the new object reader queue
interface. This new interface allows us to batch requests, which most
notably reduces memory allocations because we now only create a single
tracing span. The following benchmarks exercise a bunch of RPCs which
use this pipeline step. Before this change:
BenchmarkListAllBlobs/with_contents-16 9 129005566 ns/op 155852200 B/op 27930 allocs/op
BenchmarkListBlobs/with_contents-16 100 10172111 ns/op 5799172 B/op 3188 allocs/op
BenchmarkListAllCommits/ListAllCommits-16 36 39871779 ns/op 9247943 B/op 32796 allocs/op
BenchmarkFindAllTags/FindAllTags-16 6 186601443 ns/op 31099057 B/op 137032 allocs/op
And after this change:
BenchmarkListAllBlobs/with_contents-16 8 130562064 ns/op 155701208 B/op 23341 allocs/op
BenchmarkListBlobs/with_contents-16 157 11414869 ns/op 5766087 B/op 2365 allocs/op
BenchmarkListAllCommits/ListAllCommits-16 36 38225424 ns/op 8998776 B/op 27330 allocs/op
BenchmarkFindAllTags/FindAllTags-16 8 132635567 ns/op 29804752 B/op 106993 allocs/op
This shows that we're now doing 20-30% less allocations. Furthermore,
FindAllTags is about 30% faster compared to before.
Changelog: performance
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 63 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 3 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 5 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 18 | ||||
-rw-r--r-- | internal/gitaly/service/commit/list_all_commits.go | 5 | ||||
-rw-r--r-- | internal/gitaly/service/commit/list_commits.go | 6 | ||||
-rw-r--r-- | internal/gitaly/service/ref/find_all_tags.go | 6 | ||||
-rw-r--r-- | internal/gitaly/service/ref/tag_signatures.go | 6 |
9 files changed, 105 insertions, 23 deletions
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index d5aa1e7f6..5ec2e9fb5 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -23,6 +23,11 @@ type CatfileObjectResult struct { git.Object } +type catfileObjectRequest struct { + objectName []byte + err error +} + // CatfileObject processes catfileInfoResults from the given channel and reads associated objects // into memory via `git cat-file --batch`. The returned channel will contain all processed objects. // Any error received via the channel or encountered in this step will cause the pipeline to fail. @@ -32,7 +37,42 @@ func CatfileObject( ctx context.Context, objectReader catfile.ObjectReader, it ObjectIterator, -) CatfileObjectIterator { +) (CatfileObjectIterator, error) { + queue, cleanup, err := objectReader.ObjectQueue(ctx) + if err != nil { + return nil, err + } + defer cleanup() + + requestChan := make(chan catfileObjectRequest) + go func() { + defer close(requestChan) + + for it.Next() { + if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + + select { + case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}: + case <-ctx.Done(): + return + } + } + + if err := it.Err(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + }() + resultChan := make(chan CatfileObjectResult) go func() { defer close(resultChan) @@ -60,7 +100,15 @@ func CatfileObject( var previousObject *synchronizingObject - for it.Next() { + // It's fine to iterate over the request channel without paying attention to + // context cancellation because the request channel itself would be closed if the + // context was cancelled. + for request := range requestChan { + if request.err != nil { + sendResult(CatfileObjectResult{err: request.err}) + break + } + // We mustn't try to read another object before reading the previous object // has concluded. Given that this is not under our control but under the // control of the caller, we thus have to wait until the blocking reader has @@ -73,7 +121,7 @@ func CatfileObject( } } - object, err := objectReader.Object(ctx, it.ObjectID().Revision()) + object, err := queue.ReadObject() if err != nil { sendResult(CatfileObjectResult{ err: fmt.Errorf("requesting object: %w", err), @@ -87,22 +135,17 @@ func CatfileObject( } if isDone := sendResult(CatfileObjectResult{ - ObjectName: it.ObjectName(), + ObjectName: request.objectName, Object: previousObject, }); isDone { return } } - - if err := it.Err(); err != nil { - sendResult(CatfileObjectResult{err: err}) - return - } }() return &catfileObjectIterator{ ch: resultChan, - } + }, nil } type synchronizingObject struct { diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index fd02726d9..6cb20aa0b 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -78,7 +78,8 @@ func TestCatfileObject(t *testing.T) { objectReader, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) - it := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) + it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) + require.NoError(t, err) var results []CatfileObjectResult for it.Next() { diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 25afd4259..aa6503910 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -223,7 +223,9 @@ func TestPipeline_revlist(t *testing.T) { revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...) catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) var results []CatfileObjectResult for catfileObjectIter.Next() { @@ -275,7 +277,9 @@ func TestPipeline_revlist(t *testing.T) { revlistIter := Revlist(ctx, repo, []string{"--all"}) catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) i := 0 for catfileObjectIter.Next() { @@ -312,7 +316,9 @@ func TestPipeline_revlist(t *testing.T) { revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects()) catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) i := 0 var wg sync.WaitGroup @@ -362,7 +368,9 @@ func TestPipeline_forEachRef(t *testing.T) { forEachRefIter := ForEachRef(ctx, repo, nil) catfileInfoIter := CatfileInfo(ctx, objectInfoReader, forEachRefIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) type object struct { oid git.ObjectID diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index 55c406dd8..05923269f 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -134,7 +134,10 @@ func (s *server) processBlobs( return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter) + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter) + if err != nil { + return helper.ErrInternalf("creating catfile object iterator: %w", err) + } var i uint32 for catfileObjectIter.Next() { diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index a937881ff..140f6107d 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -62,7 +62,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git gitpipe.WithBlobLimit(lfsPointerMaxSize), gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err @@ -100,7 +104,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err @@ -149,7 +157,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil { return err diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go index 0ab8a9b66..d5fa18622 100644 --- a/internal/gitaly/service/commit/list_all_commits.go +++ b/internal/gitaly/service/commit/list_all_commits.go @@ -53,7 +53,10 @@ func (s *server) ListAllCommits( }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return err + } chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go index eb6bb4b0c..826edb9ee 100644 --- a/internal/gitaly/service/commit/list_commits.go +++ b/internal/gitaly/service/commit/list_commits.go @@ -100,7 +100,11 @@ func (s *server) ListCommits( } revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return err + } chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go index 47eb787a2..4bfb2d0e9 100644 --- a/internal/gitaly/service/ref/find_all_tags.go +++ b/internal/gitaly/service/ref/find_all_tags.go @@ -52,7 +52,11 @@ func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortFiel gitpipe.WithSortField(sortField), gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"), ) - catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter) + + catfileObjectsIter, err := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter) + if err != nil { + return err + } chunker := chunk.New(&tagSender{stream: stream}) diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go index 8ffaf4de9..8bdf78479 100644 --- a/internal/gitaly/service/ref/tag_signatures.go +++ b/internal/gitaly/service/ref/tag_signatures.go @@ -58,7 +58,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream } revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return err + } for catfileObjectIter.Next() { tag := catfileObjectIter.Result() |