diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-10-20 13:13:59 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-10-20 13:13:59 +0300 |
commit | aec413230e6bce8782ae72d95d2d3cb312ebe923 (patch) | |
tree | 1a9cc6b31d76a63a281b9c3c54b0fb8788f7cc86 | |
parent | b7be757020d5687d6e5632cc169cf925eac68634 (diff) | |
parent | 9a510cc0ed4e11be5edc67a5997e303234a679fa (diff) |
Merge branch 'pks-gitpipe-reduce-pipeline-steps' into 'master'
gitpipe: Drop useless pipeline steps
See merge request gitlab-org/gitaly!3980
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 90 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_iterator.go | 15 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 122 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 10 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_iterator.go | 15 | ||||
-rw-r--r-- | internal/git/gitpipe/object_iterator.go | 19 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 73 | ||||
-rw-r--r-- | internal/git/gitpipe/revision.go | 90 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_iterator.go | 15 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_test.go | 278 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 55 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 33 | ||||
-rw-r--r-- | internal/gitaly/service/commit/list_all_commits.go | 28 | ||||
-rw-r--r-- | internal/gitaly/service/commit/list_commits.go | 19 | ||||
-rw-r--r-- | internal/gitaly/service/ref/find_all_tags.go | 68 | ||||
-rw-r--r-- | internal/gitaly/service/ref/tag_signatures.go | 8 |
16 files changed, 364 insertions, 574 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 57d62d5f5..337760810 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -24,36 +24,63 @@ type CatfileInfoResult struct { ObjectInfo *catfile.ObjectInfo } +type catfileInfoConfig struct { + skipResult func(*catfile.ObjectInfo) bool +} + +// CatfileInfoOption is an option for the CatfileInfo and CatfileInfoAllObjects pipeline steps. +type CatfileInfoOption func(cfg *catfileInfoConfig) + +// WithSkipCatfileInfoResult will execute the given function for each ObjectInfo processed by the +// pipeline. If the callback returns `true`, then the object will be skipped and not passed down the +// pipeline. +func WithSkipCatfileInfoResult(skipResult func(*catfile.ObjectInfo) bool) CatfileInfoOption { + return func(cfg *catfileInfoConfig) { + cfg.skipResult = skipResult + } +} + // CatfileInfo processes revlistResults from the given channel and extracts object information via // `git cat-file --batch-check`. The returned channel will contain all processed catfile info // results. Any error received via the channel or encountered in this step will cause the pipeline // to fail. Context cancellation will gracefully halt the pipeline. -func CatfileInfo(ctx context.Context, objectInfoReader catfile.ObjectInfoReader, revisionIterator RevisionIterator) CatfileInfoIterator { - resultChan := make(chan CatfileInfoResult) +func CatfileInfo( + ctx context.Context, + objectInfoReader catfile.ObjectInfoReader, + it ObjectIterator, + opts ...CatfileInfoOption, +) CatfileInfoIterator { + var cfg catfileInfoConfig + for _, opt := range opts { + opt(&cfg) + } + resultChan := make(chan CatfileInfoResult) go func() { defer close(resultChan) - for revisionIterator.Next() { - revlistResult := revisionIterator.Result() - - objectInfo, err := objectInfoReader.Info(ctx, revlistResult.OID.Revision()) + for it.Next() { + objectInfo, err := objectInfoReader.Info(ctx, it.ObjectID().Revision()) if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ - err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err), + err: fmt.Errorf("retrieving object info for %q: %w", it.ObjectID(), err), }) return } + if cfg.skipResult != nil && cfg.skipResult(objectInfo) { + continue + } + if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ - ObjectName: revlistResult.ObjectName, + ObjectName: it.ObjectName(), ObjectInfo: objectInfo, }); isDone { return } } - if err := revisionIterator.Err(); err != nil { + if err := it.Err(); err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) return } @@ -69,7 +96,16 @@ func CatfileInfo(ctx context.Context, objectInfoReader catfile.ObjectInfoReader, // all processed results. Any error encountered during execution of this pipeline step will cause // the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this // pipeline step, the resulting catfileInfoResults will never have an object name. -func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInfoIterator { +func CatfileInfoAllObjects( + ctx context.Context, + repo *localrepo.Repo, + opts ...CatfileInfoOption, +) CatfileInfoIterator { + var cfg catfileInfoConfig + for _, opt := range opts { + opt(&cfg) + } + resultChan := make(chan CatfileInfoResult) go func() { @@ -106,6 +142,10 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf return } + if cfg.skipResult != nil && cfg.skipResult(objectInfo) { + continue + } + if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ ObjectInfo: objectInfo, }); isDone { @@ -126,36 +166,6 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf } } -// CatfileInfoFilter filters the catfileInfoResults from the provided channel with the filter -// function: if the filter returns `false` for a given item, then it will be dropped from the -// pipeline. Errors cannot be filtered and will always be passed through. -func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(CatfileInfoResult) bool) CatfileInfoIterator { - resultChan := make(chan CatfileInfoResult) - - go func() { - defer close(resultChan) - - for it.Next() { - result := it.Result() - if filter(result) { - if sendCatfileInfoResult(ctx, resultChan, result) { - return - } - } - } - - if err := it.Err(); err != nil { - if sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) { - return - } - } - }() - - return &catfileInfoIterator{ - ch: resultChan, - } -} - func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, result CatfileInfoResult) bool { // In case the context has been cancelled, we have a race between observing an error from // the killed Git process and observing the context cancellation itself. But if we end up diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go index 54a75b5d9..13a83ccca 100644 --- a/internal/git/gitpipe/catfile_info_iterator.go +++ b/internal/git/gitpipe/catfile_info_iterator.go @@ -1,11 +1,10 @@ package gitpipe +import "gitlab.com/gitlab-org/gitaly/v14/internal/git" + // CatfileInfoIterator is an iterator returned by the Revlist function. type CatfileInfoIterator interface { - // Next iterates to the next item. Returns `false` in case there are no more results left. - Next() bool - // Err returns the first error that was encountered. - Err() error + ObjectIterator // Result returns the current item. Result() CatfileInfoResult } @@ -49,3 +48,11 @@ func (it *catfileInfoIterator) Err() error { func (it *catfileInfoIterator) Result() CatfileInfoResult { return it.result } + +func (it *catfileInfoIterator) ObjectID() git.ObjectID { + return it.result.ObjectInfo.Oid +} + +func (it *catfileInfoIterator) ObjectName() []byte { + return it.result.ObjectName +} diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index b5054f479..16e2980bf 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -28,6 +28,7 @@ func TestCatfileInfo(t *testing.T) { for _, tc := range []struct { desc string revlistInputs []RevisionResult + opts []CatfileInfoOption expectedResults []CatfileInfoResult expectedErr error }{ @@ -85,6 +86,45 @@ func TestCatfileInfo(t *testing.T) { }, expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"), }, + { + desc: "skip everything", + revlistInputs: []RevisionResult{ + {OID: lfsPointer1}, + {OID: lfsPointer2}, + }, + opts: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(*catfile.ObjectInfo) bool { return true }), + }, + }, + { + desc: "skip one", + revlistInputs: []RevisionResult{ + {OID: lfsPointer1}, + {OID: lfsPointer2}, + }, + opts: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + return objectInfo.Oid == lfsPointer1 + }), + }, + expectedResults: []CatfileInfoResult{ + {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}}, + }, + }, + { + desc: "skip nothing", + revlistInputs: []RevisionResult{ + {OID: lfsPointer1}, + {OID: lfsPointer2}, + }, + opts: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(*catfile.ObjectInfo) bool { return false }), + }, + expectedResults: []CatfileInfoResult{ + {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}}, + {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}}, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -96,7 +136,7 @@ func TestCatfileInfo(t *testing.T) { objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) - it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs)) + it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) var results []CatfileInfoResult for it.Next() { @@ -147,83 +187,3 @@ func TestCatfileInfoAllObjects(t *testing.T) { {ObjectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}}, }, results) } - -func TestCatfileInfoFilter(t *testing.T) { - for _, tc := range []struct { - desc string - input []CatfileInfoResult - filter func(CatfileInfoResult) bool - expectedResults []CatfileInfoResult - expectedErr error - }{ - { - desc: "all accepted", - input: []CatfileInfoResult{ - {ObjectName: []byte{'a'}}, - {ObjectName: []byte{'b'}}, - {ObjectName: []byte{'c'}}, - }, - filter: func(CatfileInfoResult) bool { - return true - }, - expectedResults: []CatfileInfoResult{ - {ObjectName: []byte{'a'}}, - {ObjectName: []byte{'b'}}, - {ObjectName: []byte{'c'}}, - }, - }, - { - desc: "all filtered", - input: []CatfileInfoResult{ - {ObjectName: []byte{'a'}}, - {ObjectName: []byte{'b'}}, - {ObjectName: []byte{'c'}}, - }, - filter: func(CatfileInfoResult) bool { - return false - }, - }, - { - desc: "errors always get through", - input: []CatfileInfoResult{ - {ObjectName: []byte{'a'}}, - {ObjectName: []byte{'b'}}, - {err: errors.New("foobar")}, - {ObjectName: []byte{'c'}}, - }, - filter: func(CatfileInfoResult) bool { - return false - }, - expectedErr: errors.New("foobar"), - }, - { - desc: "subset filtered", - input: []CatfileInfoResult{ - {ObjectName: []byte{'a'}}, - {ObjectName: []byte{'b'}}, - {ObjectName: []byte{'c'}}, - }, - filter: func(r CatfileInfoResult) bool { - return r.ObjectName[0] == 'b' - }, - expectedResults: []CatfileInfoResult{ - {ObjectName: []byte{'b'}}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - it := CatfileInfoFilter(ctx, NewCatfileInfoIterator(tc.input), tc.filter) - - var results []CatfileInfoResult - for it.Next() { - results = append(results, it.Result()) - } - - require.Equal(t, tc.expectedErr, it.Err()) - require.Equal(t, tc.expectedResults, results) - }) - } -} diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 4172dffdb..3a8c2d89a 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -32,7 +32,7 @@ type CatfileObjectResult struct { func CatfileObject( ctx context.Context, objectReader catfile.ObjectReader, - it CatfileInfoIterator, + it ObjectIterator, ) CatfileObjectIterator { resultChan := make(chan CatfileObjectResult) go func() { @@ -62,8 +62,6 @@ func CatfileObject( var objectDataReader *signallingReader for it.Next() { - catfileInfoResult := it.Result() - // We mustn't try to read another object before reading the previous object // has concluded. Given that this is not under our control but under the // control of the caller, we thus have to wait until the blocking reader has @@ -76,7 +74,7 @@ func CatfileObject( } } - object, err := objectReader.Object(ctx, catfileInfoResult.ObjectInfo.Oid.Revision()) + object, err := objectReader.Object(ctx, it.ObjectID().Revision()) if err != nil { sendResult(CatfileObjectResult{ err: fmt.Errorf("requesting object: %w", err), @@ -90,8 +88,8 @@ func CatfileObject( } if isDone := sendResult(CatfileObjectResult{ - ObjectName: catfileInfoResult.ObjectName, - ObjectInfo: catfileInfoResult.ObjectInfo, + ObjectName: it.ObjectName(), + ObjectInfo: &object.ObjectInfo, ObjectReader: objectDataReader, }); isDone { return diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go index 372a94f3c..7f4a05622 100644 --- a/internal/git/gitpipe/catfile_object_iterator.go +++ b/internal/git/gitpipe/catfile_object_iterator.go @@ -1,11 +1,10 @@ package gitpipe +import "gitlab.com/gitlab-org/gitaly/v14/internal/git" + // CatfileObjectIterator is an iterator returned by the Revlist function. type CatfileObjectIterator interface { - // Next iterates to the next item. Returns `false` in case there are no more results left. - Next() bool - // Err returns the first error that was encountered. - Err() error + ObjectIterator // Result returns the current item. Result() CatfileObjectResult } @@ -49,3 +48,11 @@ func (it *catfileObjectIterator) Err() error { func (it *catfileObjectIterator) Result() CatfileObjectResult { return it.result } + +func (it *catfileObjectIterator) ObjectID() git.ObjectID { + return it.result.ObjectInfo.Oid +} + +func (it *catfileObjectIterator) ObjectName() []byte { + return it.result.ObjectName +} diff --git a/internal/git/gitpipe/object_iterator.go b/internal/git/gitpipe/object_iterator.go new file mode 100644 index 000000000..5628bc2c7 --- /dev/null +++ b/internal/git/gitpipe/object_iterator.go @@ -0,0 +1,19 @@ +package gitpipe + +import "gitlab.com/gitlab-org/gitaly/v14/internal/git" + +// ObjectIterator is a common interface that is shared across the pipeline steps that work with +// objects. +type ObjectIterator interface { + // Next iterates to the next item. Returns `false` in case there are no more results left, + // or if an error happened during iteration. The caller must call `Err()` after `Next()` has + // returned `false`. + Next() bool + // Err returns the first error that was encountered. + Err() error + // ObjectID returns the object ID of the current object. + ObjectID() git.ObjectID + // ObjectName returns the object name of the current object. This is a + // implementation-specific field and may not be set. + ObjectName() []byte +} diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index d292a4a09..3b28bde47 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -22,13 +22,12 @@ func TestPipeline_revlist(t *testing.T) { repo := localrepo.NewTestRepo(t, cfg, repoProto) for _, tc := range []struct { - desc string - revisions []string - revlistOptions []RevlistOption - revisionFilter func(RevisionResult) bool - catfileInfoFilter func(CatfileInfoResult) bool - expectedResults []CatfileObjectResult - expectedErr error + desc string + revisions []string + revlistOptions []RevlistOption + catfileInfoOptions []CatfileInfoOption + expectedResults []CatfileObjectResult + expectedErr error }{ { desc: "single blob", @@ -74,9 +73,9 @@ func TestPipeline_revlist(t *testing.T) { }, revlistOptions: []RevlistOption{ WithObjects(), - }, - revisionFilter: func(r RevisionResult) bool { - return r.OID == lfsPointer2 + WithSkipRevlistResult(func(r *RevisionResult) bool { + return r.OID != lfsPointer2 + }), }, expectedResults: []CatfileObjectResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}}, @@ -110,8 +109,10 @@ func TestPipeline_revlist(t *testing.T) { revlistOptions: []RevlistOption{ WithObjects(), }, - catfileInfoFilter: func(r CatfileInfoResult) bool { - return r.ObjectInfo.Type == "blob" + catfileInfoOptions: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + return objectInfo.Type != "blob" + }), }, expectedResults: []CatfileObjectResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")}, @@ -154,15 +155,17 @@ func TestPipeline_revlist(t *testing.T) { }, revlistOptions: []RevlistOption{ WithObjects(), - }, - revisionFilter: func(r RevisionResult) bool { - // Let through two LFS pointers and a tree. - return r.OID == "b95c0fad32f4361845f91d9ce4c1721b52b82793" || - r.OID == lfsPointer1 || r.OID == lfsPointer2 - }, - catfileInfoFilter: func(r CatfileInfoResult) bool { - // Only let through blobs, so only the two LFS pointers remain. - return r.ObjectInfo.Type == "blob" + WithSkipRevlistResult(func(r *RevisionResult) bool { + // Let through two LFS pointers and a tree. + return r.OID != "b95c0fad32f4361845f91d9ce4c1721b52b82793" && + r.OID != lfsPointer1 && r.OID != lfsPointer2 + }), + }, + catfileInfoOptions: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + // Only let through blobs, so only the two LFS pointers remain. + return objectInfo.Type != "blob" + }), }, expectedResults: []CatfileObjectResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, ObjectName: []byte("files/lfs/lfs_object.iso")}, @@ -190,13 +193,17 @@ func TestPipeline_revlist(t *testing.T) { revisions: []string{ "doesnotexist", }, - revisionFilter: func(r RevisionResult) bool { - require.Fail(t, "filter should not be invoked on errors") - return true + revlistOptions: []RevlistOption{ + WithSkipRevlistResult(func(r *RevisionResult) bool { + require.Fail(t, "filter should not be invoked on errors") + return true + }), }, - catfileInfoFilter: func(r CatfileInfoResult) bool { - require.Fail(t, "filter should not be invoked on errors") - return true + catfileInfoOptions: []CatfileInfoOption{ + WithSkipCatfileInfoResult(func(r *catfile.ObjectInfo) bool { + require.Fail(t, "filter should not be invoked on errors") + return true + }), }, expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, @@ -215,15 +222,7 @@ func TestPipeline_revlist(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...) - if tc.revisionFilter != nil { - revlistIter = RevisionFilter(ctx, revlistIter, tc.revisionFilter) - } - - catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - if tc.catfileInfoFilter != nil { - catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter) - } - + catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...) catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) var results []CatfileObjectResult @@ -269,9 +268,7 @@ func TestPipeline_revlist(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, []string{"--all"}) - revlistIter = RevisionFilter(ctx, revlistIter, func(RevisionResult) bool { return true }) catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, func(CatfileInfoResult) bool { return true }) catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) i := 0 diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index ff2e84cd2..a4eec3a1e 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -52,6 +52,7 @@ type revlistConfig struct { firstParent bool before, after time.Time author []byte + skipResult func(*RevisionResult) bool } // RevlistOption is an option for the revlist pipeline step. @@ -159,6 +160,15 @@ func WithAuthor(author []byte) RevlistOption { } } +// WithSkipRevlistResult will execute the given function for each RevisionResult processed by the +// pipeline. If the callback returns `true`, then the object will be skipped and not passed down +// the pipeline. +func WithSkipRevlistResult(skipResult func(*RevisionResult) bool) RevlistOption { + return func(cfg *revlistConfig) { + cfg.skipResult = skipResult + } +} + // Revlist runs git-rev-list(1) with objects and object names enabled. The returned channel will // contain all object IDs listed by this command. Cancelling the context will cause the pipeline to // be cancelled, too. @@ -272,6 +282,10 @@ func Revlist( result.ObjectName = oidAndName[1] } + if cfg.skipResult != nil && cfg.skipResult(&result) { + continue + } + if isDone := sendRevisionResult(ctx, resultChan, result); isDone { return } @@ -297,6 +311,22 @@ func Revlist( } } +type forEachRefConfig struct { + format string +} + +// ForEachRefOption is an option that can be passed to ForEachRef. +type ForEachRefOption func(cfg *forEachRefConfig) + +// WithForEachRefFormat is the format used by git-for-each-ref. Note that each line _must_ be of +// format "%(objectname) %(refname)" such that the pipeline can parse it correctly. You may use +// conditional format statements though to potentially produce multiple such lines. +func WithForEachRefFormat(format string) ForEachRefOption { + return func(cfg *forEachRefConfig) { + cfg.format = format + } +} + // ForEachRef runs git-for-each-ref(1) with the given patterns and returns a RevisionIterator for // found references. Patterns must always refer to fully qualified reference names. Patterns for // which no branch is found do not result in an error. The iterator's object name is set to the @@ -307,18 +337,25 @@ func ForEachRef( repo *localrepo.Repo, patterns []string, sortField string, + opts ...ForEachRefOption, ) RevisionIterator { - resultChan := make(chan RevisionResult) + cfg := forEachRefConfig{ + // The default format also includes the object type, which requires us to read the + // referenced commit's object. It would thus be about 2-3x slower to use the + // default format, and instead we move the burden into the next pipeline step by + // default. + format: "%(objectname) %(refname)", + } + for _, opt := range opts { + opt(&cfg) + } + resultChan := make(chan RevisionResult) go func() { defer close(resultChan) flags := []git.Option{ - // The default format also includes the object type, which requires - // us to read the referenced commit's object. It would thus be about - // 2-3x slower to use the default format, and instead we move the - // burden into the next pipeline step. - git.ValueFlag{Name: "--format", Value: "%(objectname) %(refname)"}, + git.ValueFlag{Name: "--format", Value: cfg.format}, } if sortField != "" { flags = append(flags, git.ValueFlag{Name: "--sort", Value: sortField}) @@ -374,47 +411,6 @@ func ForEachRef( } } -// RevisionFilter filters the RevisionResult from the provided iterator with the filter function: if -// the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors -// cannot be filtered and will always be passed through. -func RevisionFilter(ctx context.Context, it RevisionIterator, filter func(RevisionResult) bool) RevisionIterator { - return RevisionTransform(ctx, it, func(r RevisionResult) []RevisionResult { - if filter(r) { - return []RevisionResult{r} - } - return []RevisionResult{} - }) -} - -// RevisionTransform transforms each RevisionResult from the provided iterator with the transforming -// function. Instead of sending the original RevisionResult, it will instead send transformed -// results. -func RevisionTransform(ctx context.Context, it RevisionIterator, transform func(RevisionResult) []RevisionResult) RevisionIterator { - resultChan := make(chan RevisionResult) - - go func() { - defer close(resultChan) - - for it.Next() { - for _, transformed := range transform(it.Result()) { - if sendRevisionResult(ctx, resultChan, transformed) { - return - } - } - } - - if err := it.Err(); err != nil { - if sendRevisionResult(ctx, resultChan, RevisionResult{err: err}) { - return - } - } - }() - - return &revisionIterator{ - ch: resultChan, - } -} - func sendRevisionResult(ctx context.Context, ch chan<- RevisionResult, result RevisionResult) bool { // In case the context has been cancelled, we have a race between observing an error from // the killed Git process and observing the context cancellation itself. But if we end up diff --git a/internal/git/gitpipe/revision_iterator.go b/internal/git/gitpipe/revision_iterator.go index 828c0d965..8d949e648 100644 --- a/internal/git/gitpipe/revision_iterator.go +++ b/internal/git/gitpipe/revision_iterator.go @@ -1,11 +1,10 @@ package gitpipe +import "gitlab.com/gitlab-org/gitaly/v14/internal/git" + // RevisionIterator is an iterator returned by the Revlist function. type RevisionIterator interface { - // Next iterates to the next item. Returns `false` in case there are no more results left. - Next() bool - // Err returns the first error that was encountered. - Err() error + ObjectIterator // Result returns the current item. Result() RevisionResult } @@ -49,3 +48,11 @@ func (it *revisionIterator) Err() error { func (it *revisionIterator) Result() RevisionResult { return it.result } + +func (it *revisionIterator) ObjectID() git.ObjectID { + return it.result.OID +} + +func (it *revisionIterator) ObjectName() []byte { + return it.result.ObjectName +} diff --git a/internal/git/gitpipe/revision_test.go b/internal/git/gitpipe/revision_test.go index 3db71ec60..b40151dbe 100644 --- a/internal/git/gitpipe/revision_test.go +++ b/internal/git/gitpipe/revision_test.go @@ -427,6 +427,53 @@ func TestRevlist(t *testing.T) { }, expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, + { + desc: "skip everything", + revisions: []string{ + "79d5f98270ad677c86a7e1ab2baa922958565135", + }, + options: []RevlistOption{ + WithObjects(), + WithBlobLimit(10), + WithObjectTypeFilter(ObjectTypeBlob), + WithSkipRevlistResult(func(*RevisionResult) bool { return true }), + }, + }, + { + desc: "skip nothing", + revisions: []string{ + "79d5f98270ad677c86a7e1ab2baa922958565135", + }, + options: []RevlistOption{ + WithObjects(), + WithBlobLimit(10), + WithObjectTypeFilter(ObjectTypeBlob), + WithSkipRevlistResult(func(*RevisionResult) bool { return false }), + }, + expectedResults: []RevisionResult{ + {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")}, + {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")}, + {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")}, + }, + }, + { + desc: "skip one", + revisions: []string{ + "79d5f98270ad677c86a7e1ab2baa922958565135", + }, + options: []RevlistOption{ + WithObjects(), + WithBlobLimit(10), + WithObjectTypeFilter(ObjectTypeBlob), + WithSkipRevlistResult(func(r *RevisionResult) bool { + return string(r.ObjectName) == "hotfix-2.txt" + }), + }, + expectedResults: []RevisionResult{ + {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")}, + {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")}, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -456,8 +503,8 @@ func TestForEachRef(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - readRefs := func(t *testing.T, repo *localrepo.Repo, patterns ...string) []RevisionResult { - it := ForEachRef(ctx, repo, patterns, "") + readRefs := func(t *testing.T, repo *localrepo.Repo, patterns []string, opts ...ForEachRefOption) []RevisionResult { + it := ForEachRef(ctx, repo, patterns, "", opts...) var results []RevisionResult for it.Next() { @@ -485,11 +532,11 @@ func TestForEachRef(t *testing.T) { ObjectName: []byte("refs/heads/master"), OID: revisions["refs/heads/master"], }, - }, readRefs(t, repo, "refs/heads/master")) + }, readRefs(t, repo, []string{"refs/heads/master"})) }) t.Run("unqualified branch name", func(t *testing.T) { - require.Nil(t, readRefs(t, repo, "master")) + require.Nil(t, readRefs(t, repo, []string{"master"})) }) t.Run("multiple branches", func(t *testing.T) { @@ -502,11 +549,11 @@ func TestForEachRef(t *testing.T) { ObjectName: []byte("refs/heads/master"), OID: revisions["refs/heads/master"], }, - }, readRefs(t, repo, "refs/heads/master", "refs/heads/feature")) + }, readRefs(t, repo, []string{"refs/heads/master", "refs/heads/feature"})) }) t.Run("branches pattern", func(t *testing.T) { - refs := readRefs(t, repo, "refs/heads/*") + refs := readRefs(t, repo, []string{"refs/heads/*"}) require.Greater(t, len(refs), 90) require.Subset(t, refs, []RevisionResult{ @@ -521,212 +568,33 @@ func TestForEachRef(t *testing.T) { }) }) + t.Run("tag with format", func(t *testing.T) { + refs := readRefs(t, repo, []string{"refs/tags/v1.0.0"}, + WithForEachRefFormat("%(objectname) tag\n%(*objectname) peeled"), + ) + + require.Equal(t, refs, []RevisionResult{ + { + ObjectName: []byte("tag"), + OID: "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8", + }, + { + ObjectName: []byte("peeled"), + OID: "6f6d7e7ed97bb5f0054f2b1df789b39ca89b6ff9", + }, + }) + }) + t.Run("multiple patterns", func(t *testing.T) { - refs := readRefs(t, repo, "refs/heads/*", "refs/tags/*") + refs := readRefs(t, repo, []string{"refs/heads/*", "refs/tags/*"}) require.Greater(t, len(refs), 90) }) t.Run("nonexisting branch", func(t *testing.T) { - require.Nil(t, readRefs(t, repo, "refs/heads/idontexist")) + require.Nil(t, readRefs(t, repo, []string{"refs/heads/idontexist"})) }) t.Run("nonexisting pattern", func(t *testing.T) { - require.Nil(t, readRefs(t, repo, "refs/idontexist/*")) + require.Nil(t, readRefs(t, repo, []string{"refs/idontexist/*"})) }) } - -func TestRevisionFilter(t *testing.T) { - for _, tc := range []struct { - desc string - input []RevisionResult - filter func(RevisionResult) bool - expectedResults []RevisionResult - expectedErr error - }{ - { - desc: "all accepted", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - filter: func(RevisionResult) bool { - return true - }, - expectedResults: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - }, - { - desc: "all filtered", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - filter: func(RevisionResult) bool { - return false - }, - expectedResults: nil, - }, - { - desc: "errors always get through", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {err: errors.New("foobar")}, - {OID: "c"}, - }, - filter: func(RevisionResult) bool { - return false - }, - expectedErr: errors.New("foobar"), - }, - { - desc: "subset filtered", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - filter: func(r RevisionResult) bool { - return r.OID == "b" - }, - expectedResults: []RevisionResult{ - {OID: "b"}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - it := RevisionFilter(ctx, NewRevisionIterator(tc.input), tc.filter) - - var results []RevisionResult - for it.Next() { - results = append(results, it.Result()) - } - - require.Equal(t, tc.expectedErr, it.Err()) - require.Equal(t, tc.expectedResults, results) - }) - } -} - -func TestRevisionTransform(t *testing.T) { - for _, tc := range []struct { - desc string - input []RevisionResult - transform func(RevisionResult) []RevisionResult - expectedResults []RevisionResult - expectedErr error - }{ - { - desc: "identity mapping", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - transform: func(r RevisionResult) []RevisionResult { - return []RevisionResult{r} - }, - expectedResults: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - }, - { - desc: "strip object", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - transform: func(r RevisionResult) []RevisionResult { - if r.OID == "b" { - return []RevisionResult{} - } - return []RevisionResult{r} - }, - expectedResults: []RevisionResult{ - {OID: "a"}, - {OID: "c"}, - }, - }, - { - desc: "replace items", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - transform: func(RevisionResult) []RevisionResult { - return []RevisionResult{{OID: "x"}} - }, - expectedResults: []RevisionResult{ - {OID: "x"}, - {OID: "x"}, - {OID: "x"}, - }, - }, - { - desc: "add additional items", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {OID: "c"}, - }, - transform: func(r RevisionResult) []RevisionResult { - return []RevisionResult{ - r, - {OID: r.OID + "x"}, - } - }, - expectedResults: []RevisionResult{ - {OID: "a"}, - {OID: "ax"}, - {OID: "b"}, - {OID: "bx"}, - {OID: "c"}, - {OID: "cx"}, - }, - }, - { - desc: "error handling", - input: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - {err: errors.New("foobar")}, - {OID: "c"}, - }, - transform: func(r RevisionResult) []RevisionResult { - return []RevisionResult{r} - }, - expectedResults: []RevisionResult{ - {OID: "a"}, - {OID: "b"}, - }, - expectedErr: errors.New("foobar"), - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - it := RevisionTransform(ctx, NewRevisionIterator(tc.input), tc.transform) - - var results []RevisionResult - for it.Next() { - results = append(results, it.Result()) - } - - require.Equal(t, tc.expectedErr, it.Err()) - require.Equal(t, tc.expectedResults, results) - }) - } -} diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index aafeef302..391500403 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -9,6 +9,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -41,16 +42,6 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS ctx := stream.Context() repo := s.localrepo(req.GetRepository()) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) - if err != nil { - return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) - } - - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) - if err != nil { - return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) - } - chunker := chunk.New(&blobSender{ send: func(blobs []*gitalypb.ListBlobsResponse_Blob) error { return stream.Send(&gitalypb.ListBlobsResponse{ @@ -65,9 +56,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS } revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...) - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter) - if err := processBlobs(ctx, objectReader, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(), + if err := s.processBlobs(ctx, repo, revlistIter, nil, req.GetLimit(), req.GetBytesLimit(), func(oid string, size int64, contents []byte, path []byte) error { if !req.GetWithPaths() { path = nil @@ -91,9 +81,10 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS return nil } -func processBlobs( +func (s *server) processBlobs( ctx context.Context, - objectReader catfile.ObjectReader, + repo *localrepo.Repo, + objectIter gitpipe.ObjectIterator, catfileInfoIter gitpipe.CatfileInfoIterator, blobsLimit uint32, bytesLimit int64, @@ -102,6 +93,19 @@ func processBlobs( // If we have a zero bytes limit, then the caller didn't request any blob contents at all. // We can thus skip reading blob contents completely. if bytesLimit == 0 { + // This is a bit untidy, but some callers may already use an object info iterator to + // enumerate objects, where it thus wouldn't make sense to recreate it via the + // object iterator. We thus support an optional `catfileInfoIter` parameter: if set, + // we just use that one and ignore the object iterator. + if catfileInfoIter == nil { + objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) + if err != nil { + return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) + } + + catfileInfoIter = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter) + } + var i uint32 for catfileInfoIter.Next() { blob := catfileInfoIter.Result() @@ -125,7 +129,12 @@ func processBlobs( return helper.ErrInternal(err) } } else { - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + objectReader, err := s.catfileCache.ObjectReader(ctx, repo) + if err != nil { + return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) + } + + catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter) var i uint32 for catfileObjectIter.Next() { @@ -230,17 +239,13 @@ func (s *server) ListAllBlobs(req *gitalypb.ListAllBlobsRequest, stream gitalypb }, }) - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) - if err != nil { - return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) - } - - catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo) - catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { - return r.ObjectInfo.Type == "blob" - }) + catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, + gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + return objectInfo.Type != "blob" + }), + ) - if err := processBlobs(ctx, objectReader, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(), + if err := s.processBlobs(ctx, repo, catfileInfoIter, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(), func(oid string, size int64, contents []byte, path []byte) error { return chunker.Send(&gitalypb.ListAllBlobsResponse_Blob{ Oid: oid, diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 907db1e51..f7e37c80d 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -8,6 +8,7 @@ import ( gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk" @@ -51,25 +52,17 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git repo := s.localrepo(in.GetRepository()) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) - if err != nil { - return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) - } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } - revlistOptions := []gitpipe.RevlistOption{ + revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), gitpipe.WithObjects(), gitpipe.WithBlobLimit(lfsPointerMaxSize), gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob), - } - - revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...) - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + ) + catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err @@ -102,10 +95,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } - catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo) - catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { - return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize - }) + catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, + gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize + }), + ) catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { @@ -150,10 +144,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)} } - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs)) - catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { - return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize - }) + catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs), + gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize + }), + ) catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil { diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go index 072b0bb47..d7dfdb66b 100644 --- a/internal/gitaly/service/commit/list_all_commits.go +++ b/internal/gitaly/service/commit/list_all_commits.go @@ -35,27 +35,23 @@ func (s *server) ListAllCommits( return helper.ErrInternalf("creating object reader: %w", err) } - catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo) - // If we've got a pagination token, then we will only start to print commits as soon as // we've seen the token. token := request.GetPaginationParams().GetPageToken() waitingForToken := token != "" - catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { - if waitingForToken { - waitingForToken = r.ObjectInfo.Oid != git.ObjectID(token) - // We also skip the token itself, thus we always return `false` - // here. - return false - } - - if r.ObjectInfo.Type != "commit" { - return false - } - - return true - }) + catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo, + gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { + if waitingForToken { + waitingForToken = objectInfo.Oid != git.ObjectID(token) + // We also skip the token itself, thus we always return `false` + // here. + return true + } + + return objectInfo.Type != "commit" + }), + ) catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go index 44c599b96..8a3cbfce2 100644 --- a/internal/gitaly/service/commit/list_commits.go +++ b/internal/gitaly/service/commit/list_commits.go @@ -39,11 +39,6 @@ func (s *server) ListCommits( ctx := stream.Context() repo := s.localrepo(request.GetRepository()) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) - if err != nil { - return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) - } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) @@ -88,26 +83,24 @@ func (s *server) ListCommits( revlistOptions = append(revlistOptions, gitpipe.WithAuthor(request.GetAuthor())) } - revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...) - // If we've got a pagination token, then we will only start to print commits as soon as // we've seen the token. if token := request.GetPaginationParams().GetPageToken(); token != "" { tokenSeen := false - revlistIter = gitpipe.RevisionFilter(ctx, revlistIter, func(r gitpipe.RevisionResult) bool { + revlistOptions = append(revlistOptions, gitpipe.WithSkipRevlistResult(func(r *gitpipe.RevisionResult) bool { if !tokenSeen { tokenSeen = r.OID == git.ObjectID(token) // We also skip the token itself, thus we always return `false` // here. - return false + return true } - return true - }) + return false + })) } - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...) + catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go index 051d2e366..e7667d349 100644 --- a/internal/gitaly/service/ref/find_all_tags.go +++ b/internal/gitaly/service/ref/find_all_tags.go @@ -38,77 +38,15 @@ func (s *server) FindAllTags(in *gitalypb.FindAllTagsRequest, stream gitalypb.Re } func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortField string, stream gitalypb.RefService_FindAllTagsServer) error { - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) - if err != nil { - return fmt.Errorf("error creating object info reader: %v", err) - } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return fmt.Errorf("error creating object reader: %v", err) } - forEachRefIter := gitpipe.ForEachRef(ctx, repo, []string{"refs/tags/"}, sortField) - forEachRefIter = gitpipe.RevisionTransform(ctx, forEachRefIter, - func(r gitpipe.RevisionResult) []gitpipe.RevisionResult { - // We transform the pipeline to include each tag-reference twice: once for - // the "normal" object, and once we opportunistically peel the object to a - // non-tag object. This is required such that we can efficiently parse the - // tagged object. - return []gitpipe.RevisionResult{ - r, - {OID: r.OID + "^{}"}, - } - }, - ) - - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, forEachRefIter) - - // In the previous pipeline step, we request information about both the object and the - // peeled object in case the object is a tag. Given that we now know about object types, we - // can filter out the second request in case the object is not a tag: peeling a non-tag - // object to a non-tag object is always going to end up with the same object anyway. And - // requesting the same object twice is moot. - type state int - const ( - // stateTag indicates that the next object is going to be a tag. - stateTag = state(iota) - // statePeeledTag indicates that the next object is going to be the peeled object of - // the preceding tag. - statePeeledTag - // stateSkip indicates that the next object shall be skipped because it is the - // peeled version of a non-tag object, which is the same object anyway. - stateSkip + forEachRefIter := gitpipe.ForEachRef(ctx, repo, []string{"refs/tags/"}, sortField, + gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"), ) - - currentState := stateTag - catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, - func(r gitpipe.CatfileInfoResult) bool { - switch currentState { - case stateTag: - // If we've got a tag, then we want to also see its peeled object. - // Otherwise, we can skip over the peeled object. - currentState = statePeeledTag - if r.ObjectInfo.Type != "tag" { - currentState = stateSkip - } - return true - case statePeeledTag: - currentState = stateTag - return true - case stateSkip: - currentState = stateTag - return false - } - - // We could try to gracefully handle this, but I don't see much of a point - // given that we can see above that it's never going to be anything else but - // a known state. - panic("invalid state") - }, - ) - - catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter) chunker := chunk.New(&tagSender{stream: stream}) diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go index ad2630d60..091275075 100644 --- a/internal/gitaly/service/ref/tag_signatures.go +++ b/internal/gitaly/service/ref/tag_signatures.go @@ -39,11 +39,6 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream ctx := stream.Context() repo := s.localrepo(req.GetRepository()) - objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo) - if err != nil { - return helper.ErrInternalf("creating object info reader: %w", err) - } - objectReader, err := s.catfileCache.ObjectReader(ctx, repo) if err != nil { return helper.ErrInternalf("creating object reader: %w", err) @@ -63,8 +58,7 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream } revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...) - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) for catfileObjectIter.Next() { tag := catfileObjectIter.Result() |