Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-02 13:05:24 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-08 17:48:45 +0300
commit196235a330a3cb155a5d98b3fc071189b7af9acb (patch)
tree05682f06c9755bc39c4826a91407d339f452a667
parentfa7c6cd3bcad4e8307558cfc561f9fbf32f01b85 (diff)
gitpipe: Convert object pipeline to use object reader queue
Convert the object pipeline step to use the new object reader queue interface. This new interface allows us to batch requests, which most notably reduces memory allocations because we now only create a single tracing span. The following benchmarks exercise a bunch of RPCs which use this pipeline step. Before this change: BenchmarkListAllBlobs/with_contents-16 9 129005566 ns/op 155852200 B/op 27930 allocs/op BenchmarkListBlobs/with_contents-16 100 10172111 ns/op 5799172 B/op 3188 allocs/op BenchmarkListAllCommits/ListAllCommits-16 36 39871779 ns/op 9247943 B/op 32796 allocs/op BenchmarkFindAllTags/FindAllTags-16 6 186601443 ns/op 31099057 B/op 137032 allocs/op And after this change: BenchmarkListAllBlobs/with_contents-16 8 130562064 ns/op 155701208 B/op 23341 allocs/op BenchmarkListBlobs/with_contents-16 157 11414869 ns/op 5766087 B/op 2365 allocs/op BenchmarkListAllCommits/ListAllCommits-16 36 38225424 ns/op 8998776 B/op 27330 allocs/op BenchmarkFindAllTags/FindAllTags-16 8 132635567 ns/op 29804752 B/op 106993 allocs/op This shows that we're now doing 20-30% less allocations. Furthermore, FindAllTags is about 30% faster compared to before. Changelog: performance
-rw-r--r--internal/git/gitpipe/catfile_object.go63
-rw-r--r--internal/git/gitpipe/catfile_object_test.go3
-rw-r--r--internal/git/gitpipe/pipeline_test.go16
-rw-r--r--internal/gitaly/service/blob/blobs.go5
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go18
-rw-r--r--internal/gitaly/service/commit/list_all_commits.go5
-rw-r--r--internal/gitaly/service/commit/list_commits.go6
-rw-r--r--internal/gitaly/service/ref/find_all_tags.go6
-rw-r--r--internal/gitaly/service/ref/tag_signatures.go6
9 files changed, 105 insertions, 23 deletions
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index d5aa1e7f6..5ec2e9fb5 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -23,6 +23,11 @@ type CatfileObjectResult struct {
git.Object
}
+type catfileObjectRequest struct {
+ objectName []byte
+ err error
+}
+
// CatfileObject processes catfileInfoResults from the given channel and reads associated objects
// into memory via `git cat-file --batch`. The returned channel will contain all processed objects.
// Any error received via the channel or encountered in this step will cause the pipeline to fail.
@@ -32,7 +37,42 @@ func CatfileObject(
ctx context.Context,
objectReader catfile.ObjectReader,
it ObjectIterator,
-) CatfileObjectIterator {
+) (CatfileObjectIterator, error) {
+ queue, cleanup, err := objectReader.ObjectQueue(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer cleanup()
+
+ requestChan := make(chan catfileObjectRequest)
+ go func() {
+ defer close(requestChan)
+
+ for it.Next() {
+ if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ select {
+ case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
resultChan := make(chan CatfileObjectResult)
go func() {
defer close(resultChan)
@@ -60,7 +100,15 @@ func CatfileObject(
var previousObject *synchronizingObject
- for it.Next() {
+ // It's fine to iterate over the request channel without paying attention to
+ // context cancellation because the request channel itself would be closed if the
+ // context was cancelled.
+ for request := range requestChan {
+ if request.err != nil {
+ sendResult(CatfileObjectResult{err: request.err})
+ break
+ }
+
// We mustn't try to read another object before reading the previous object
// has concluded. Given that this is not under our control but under the
// control of the caller, we thus have to wait until the blocking reader has
@@ -73,7 +121,7 @@ func CatfileObject(
}
}
- object, err := objectReader.Object(ctx, it.ObjectID().Revision())
+ object, err := queue.ReadObject()
if err != nil {
sendResult(CatfileObjectResult{
err: fmt.Errorf("requesting object: %w", err),
@@ -87,22 +135,17 @@ func CatfileObject(
}
if isDone := sendResult(CatfileObjectResult{
- ObjectName: it.ObjectName(),
+ ObjectName: request.objectName,
Object: previousObject,
}); isDone {
return
}
}
-
- if err := it.Err(); err != nil {
- sendResult(CatfileObjectResult{err: err})
- return
- }
}()
return &catfileObjectIterator{
ch: resultChan,
- }
+ }, nil
}
type synchronizingObject struct {
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index fd02726d9..6cb20aa0b 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -78,7 +78,8 @@ func TestCatfileObject(t *testing.T) {
objectReader, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
- it := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ require.NoError(t, err)
var results []CatfileObjectResult
for it.Next() {
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 25afd4259..aa6503910 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -223,7 +223,9 @@ func TestPipeline_revlist(t *testing.T) {
revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...)
catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
var results []CatfileObjectResult
for catfileObjectIter.Next() {
@@ -275,7 +277,9 @@ func TestPipeline_revlist(t *testing.T) {
revlistIter := Revlist(ctx, repo, []string{"--all"})
catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
i := 0
for catfileObjectIter.Next() {
@@ -312,7 +316,9 @@ func TestPipeline_revlist(t *testing.T) {
revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects())
catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
i := 0
var wg sync.WaitGroup
@@ -362,7 +368,9 @@ func TestPipeline_forEachRef(t *testing.T) {
forEachRefIter := ForEachRef(ctx, repo, nil)
catfileInfoIter := CatfileInfo(ctx, objectInfoReader, forEachRefIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
type object struct {
oid git.ObjectID
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 55c406dd8..05923269f 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -134,7 +134,10 @@ func (s *server) processBlobs(
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter)
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
+ if err != nil {
+ return helper.ErrInternalf("creating catfile object iterator: %w", err)
+ }
var i uint32
for catfileObjectIter.Next() {
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index a937881ff..140f6107d 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -62,7 +62,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
gitpipe.WithBlobLimit(lfsPointerMaxSize),
gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
@@ -100,7 +104,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
@@ -149,7 +157,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil {
return err
diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go
index 0ab8a9b66..d5fa18622 100644
--- a/internal/gitaly/service/commit/list_all_commits.go
+++ b/internal/gitaly/service/commit/list_all_commits.go
@@ -53,7 +53,10 @@ func (s *server) ListAllCommits(
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&commitsSender{
send: func(commits []*gitalypb.GitCommit) error {
diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go
index eb6bb4b0c..826edb9ee 100644
--- a/internal/gitaly/service/commit/list_commits.go
+++ b/internal/gitaly/service/commit/list_commits.go
@@ -100,7 +100,11 @@ func (s *server) ListCommits(
}
revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&commitsSender{
send: func(commits []*gitalypb.GitCommit) error {
diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go
index 47eb787a2..4bfb2d0e9 100644
--- a/internal/gitaly/service/ref/find_all_tags.go
+++ b/internal/gitaly/service/ref/find_all_tags.go
@@ -52,7 +52,11 @@ func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortFiel
gitpipe.WithSortField(sortField),
gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"),
)
- catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter)
+
+ catfileObjectsIter, err := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&tagSender{stream: stream})
diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go
index 8ffaf4de9..8bdf78479 100644
--- a/internal/gitaly/service/ref/tag_signatures.go
+++ b/internal/gitaly/service/ref/tag_signatures.go
@@ -58,7 +58,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
}
revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return err
+ }
for catfileObjectIter.Next() {
tag := catfileObjectIter.Result()