Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-10-20 13:13:59 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-10-20 13:13:59 +0300
commitaec413230e6bce8782ae72d95d2d3cb312ebe923 (patch)
tree1a9cc6b31d76a63a281b9c3c54b0fb8788f7cc86
parentb7be757020d5687d6e5632cc169cf925eac68634 (diff)
parent9a510cc0ed4e11be5edc67a5997e303234a679fa (diff)
Merge branch 'pks-gitpipe-reduce-pipeline-steps' into 'master'
gitpipe: Drop useless pipeline steps See merge request gitlab-org/gitaly!3980
-rw-r--r--internal/git/gitpipe/catfile_info.go90
-rw-r--r--internal/git/gitpipe/catfile_info_iterator.go15
-rw-r--r--internal/git/gitpipe/catfile_info_test.go122
-rw-r--r--internal/git/gitpipe/catfile_object.go10
-rw-r--r--internal/git/gitpipe/catfile_object_iterator.go15
-rw-r--r--internal/git/gitpipe/object_iterator.go19
-rw-r--r--internal/git/gitpipe/pipeline_test.go73
-rw-r--r--internal/git/gitpipe/revision.go90
-rw-r--r--internal/git/gitpipe/revision_iterator.go15
-rw-r--r--internal/git/gitpipe/revision_test.go278
-rw-r--r--internal/gitaly/service/blob/blobs.go55
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go33
-rw-r--r--internal/gitaly/service/commit/list_all_commits.go28
-rw-r--r--internal/gitaly/service/commit/list_commits.go19
-rw-r--r--internal/gitaly/service/ref/find_all_tags.go68
-rw-r--r--internal/gitaly/service/ref/tag_signatures.go8
16 files changed, 364 insertions, 574 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 57d62d5f5..337760810 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -24,36 +24,63 @@ type CatfileInfoResult struct {
ObjectInfo *catfile.ObjectInfo
}
+type catfileInfoConfig struct {
+ skipResult func(*catfile.ObjectInfo) bool
+}
+
+// CatfileInfoOption is an option for the CatfileInfo and CatfileInfoAllObjects pipeline steps.
+type CatfileInfoOption func(cfg *catfileInfoConfig)
+
+// WithSkipCatfileInfoResult will execute the given function for each ObjectInfo processed by the
+// pipeline. If the callback returns `true`, then the object will be skipped and not passed down the
+// pipeline.
+func WithSkipCatfileInfoResult(skipResult func(*catfile.ObjectInfo) bool) CatfileInfoOption {
+ return func(cfg *catfileInfoConfig) {
+ cfg.skipResult = skipResult
+ }
+}
+
// CatfileInfo processes revlistResults from the given channel and extracts object information via
// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
// results. Any error received via the channel or encountered in this step will cause the pipeline
// to fail. Context cancellation will gracefully halt the pipeline.
-func CatfileInfo(ctx context.Context, objectInfoReader catfile.ObjectInfoReader, revisionIterator RevisionIterator) CatfileInfoIterator {
- resultChan := make(chan CatfileInfoResult)
+func CatfileInfo(
+ ctx context.Context,
+ objectInfoReader catfile.ObjectInfoReader,
+ it ObjectIterator,
+ opts ...CatfileInfoOption,
+) CatfileInfoIterator {
+ var cfg catfileInfoConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+ resultChan := make(chan CatfileInfoResult)
go func() {
defer close(resultChan)
- for revisionIterator.Next() {
- revlistResult := revisionIterator.Result()
-
- objectInfo, err := objectInfoReader.Info(ctx, revlistResult.OID.Revision())
+ for it.Next() {
+ objectInfo, err := objectInfoReader.Info(ctx, it.ObjectID().Revision())
if err != nil {
sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
- err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err),
+ err: fmt.Errorf("retrieving object info for %q: %w", it.ObjectID(), err),
})
return
}
+ if cfg.skipResult != nil && cfg.skipResult(objectInfo) {
+ continue
+ }
+
if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
- ObjectName: revlistResult.ObjectName,
+ ObjectName: it.ObjectName(),
ObjectInfo: objectInfo,
}); isDone {
return
}
}
- if err := revisionIterator.Err(); err != nil {
+ if err := it.Err(); err != nil {
sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err})
return
}
@@ -69,7 +96,16 @@ func CatfileInfo(ctx context.Context, objectInfoReader catfile.ObjectInfoReader,
// all processed results. Any error encountered during execution of this pipeline step will cause
// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
// pipeline step, the resulting catfileInfoResults will never have an object name.
-func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInfoIterator {
+func CatfileInfoAllObjects(
+ ctx context.Context,
+ repo *localrepo.Repo,
+ opts ...CatfileInfoOption,
+) CatfileInfoIterator {
+ var cfg catfileInfoConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
resultChan := make(chan CatfileInfoResult)
go func() {
@@ -106,6 +142,10 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
return
}
+ if cfg.skipResult != nil && cfg.skipResult(objectInfo) {
+ continue
+ }
+
if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
ObjectInfo: objectInfo,
}); isDone {
@@ -126,36 +166,6 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInf
}
}
-// CatfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
-// function: if the filter returns `false` for a given item, then it will be dropped from the
-// pipeline. Errors cannot be filtered and will always be passed through.
-func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(CatfileInfoResult) bool) CatfileInfoIterator {
- resultChan := make(chan CatfileInfoResult)
-
- go func() {
- defer close(resultChan)
-
- for it.Next() {
- result := it.Result()
- if filter(result) {
- if sendCatfileInfoResult(ctx, resultChan, result) {
- return
- }
- }
- }
-
- if err := it.Err(); err != nil {
- if sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) {
- return
- }
- }
- }()
-
- return &catfileInfoIterator{
- ch: resultChan,
- }
-}
-
func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, result CatfileInfoResult) bool {
// In case the context has been cancelled, we have a race between observing an error from
// the killed Git process and observing the context cancellation itself. But if we end up
diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go
index 54a75b5d9..13a83ccca 100644
--- a/internal/git/gitpipe/catfile_info_iterator.go
+++ b/internal/git/gitpipe/catfile_info_iterator.go
@@ -1,11 +1,10 @@
package gitpipe
+import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+
// CatfileInfoIterator is an iterator returned by the Revlist function.
type CatfileInfoIterator interface {
- // Next iterates to the next item. Returns `false` in case there are no more results left.
- Next() bool
- // Err returns the first error that was encountered.
- Err() error
+ ObjectIterator
// Result returns the current item.
Result() CatfileInfoResult
}
@@ -49,3 +48,11 @@ func (it *catfileInfoIterator) Err() error {
func (it *catfileInfoIterator) Result() CatfileInfoResult {
return it.result
}
+
+func (it *catfileInfoIterator) ObjectID() git.ObjectID {
+ return it.result.ObjectInfo.Oid
+}
+
+func (it *catfileInfoIterator) ObjectName() []byte {
+ return it.result.ObjectName
+}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index b5054f479..16e2980bf 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -28,6 +28,7 @@ func TestCatfileInfo(t *testing.T) {
for _, tc := range []struct {
desc string
revlistInputs []RevisionResult
+ opts []CatfileInfoOption
expectedResults []CatfileInfoResult
expectedErr error
}{
@@ -85,6 +86,45 @@ func TestCatfileInfo(t *testing.T) {
},
expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"),
},
+ {
+ desc: "skip everything",
+ revlistInputs: []RevisionResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer2},
+ },
+ opts: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(*catfile.ObjectInfo) bool { return true }),
+ },
+ },
+ {
+ desc: "skip one",
+ revlistInputs: []RevisionResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer2},
+ },
+ opts: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ return objectInfo.Oid == lfsPointer1
+ }),
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ },
+ },
+ {
+ desc: "skip nothing",
+ revlistInputs: []RevisionResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer2},
+ },
+ opts: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(*catfile.ObjectInfo) bool { return false }),
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -96,7 +136,7 @@ func TestCatfileInfo(t *testing.T) {
objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
- it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs))
+ it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
var results []CatfileInfoResult
for it.Next() {
@@ -147,83 +187,3 @@ func TestCatfileInfoAllObjects(t *testing.T) {
{ObjectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
}, results)
}
-
-func TestCatfileInfoFilter(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []CatfileInfoResult
- filter func(CatfileInfoResult) bool
- expectedResults []CatfileInfoResult
- expectedErr error
- }{
- {
- desc: "all accepted",
- input: []CatfileInfoResult{
- {ObjectName: []byte{'a'}},
- {ObjectName: []byte{'b'}},
- {ObjectName: []byte{'c'}},
- },
- filter: func(CatfileInfoResult) bool {
- return true
- },
- expectedResults: []CatfileInfoResult{
- {ObjectName: []byte{'a'}},
- {ObjectName: []byte{'b'}},
- {ObjectName: []byte{'c'}},
- },
- },
- {
- desc: "all filtered",
- input: []CatfileInfoResult{
- {ObjectName: []byte{'a'}},
- {ObjectName: []byte{'b'}},
- {ObjectName: []byte{'c'}},
- },
- filter: func(CatfileInfoResult) bool {
- return false
- },
- },
- {
- desc: "errors always get through",
- input: []CatfileInfoResult{
- {ObjectName: []byte{'a'}},
- {ObjectName: []byte{'b'}},
- {err: errors.New("foobar")},
- {ObjectName: []byte{'c'}},
- },
- filter: func(CatfileInfoResult) bool {
- return false
- },
- expectedErr: errors.New("foobar"),
- },
- {
- desc: "subset filtered",
- input: []CatfileInfoResult{
- {ObjectName: []byte{'a'}},
- {ObjectName: []byte{'b'}},
- {ObjectName: []byte{'c'}},
- },
- filter: func(r CatfileInfoResult) bool {
- return r.ObjectName[0] == 'b'
- },
- expectedResults: []CatfileInfoResult{
- {ObjectName: []byte{'b'}},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- it := CatfileInfoFilter(ctx, NewCatfileInfoIterator(tc.input), tc.filter)
-
- var results []CatfileInfoResult
- for it.Next() {
- results = append(results, it.Result())
- }
-
- require.Equal(t, tc.expectedErr, it.Err())
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 4172dffdb..3a8c2d89a 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -32,7 +32,7 @@ type CatfileObjectResult struct {
func CatfileObject(
ctx context.Context,
objectReader catfile.ObjectReader,
- it CatfileInfoIterator,
+ it ObjectIterator,
) CatfileObjectIterator {
resultChan := make(chan CatfileObjectResult)
go func() {
@@ -62,8 +62,6 @@ func CatfileObject(
var objectDataReader *signallingReader
for it.Next() {
- catfileInfoResult := it.Result()
-
// We mustn't try to read another object before reading the previous object
// has concluded. Given that this is not under our control but under the
// control of the caller, we thus have to wait until the blocking reader has
@@ -76,7 +74,7 @@ func CatfileObject(
}
}
- object, err := objectReader.Object(ctx, catfileInfoResult.ObjectInfo.Oid.Revision())
+ object, err := objectReader.Object(ctx, it.ObjectID().Revision())
if err != nil {
sendResult(CatfileObjectResult{
err: fmt.Errorf("requesting object: %w", err),
@@ -90,8 +88,8 @@ func CatfileObject(
}
if isDone := sendResult(CatfileObjectResult{
- ObjectName: catfileInfoResult.ObjectName,
- ObjectInfo: catfileInfoResult.ObjectInfo,
+ ObjectName: it.ObjectName(),
+ ObjectInfo: &object.ObjectInfo,
ObjectReader: objectDataReader,
}); isDone {
return
diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go
index 372a94f3c..7f4a05622 100644
--- a/internal/git/gitpipe/catfile_object_iterator.go
+++ b/internal/git/gitpipe/catfile_object_iterator.go
@@ -1,11 +1,10 @@
package gitpipe
+import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+
// CatfileObjectIterator is an iterator returned by the Revlist function.
type CatfileObjectIterator interface {
- // Next iterates to the next item. Returns `false` in case there are no more results left.
- Next() bool
- // Err returns the first error that was encountered.
- Err() error
+ ObjectIterator
// Result returns the current item.
Result() CatfileObjectResult
}
@@ -49,3 +48,11 @@ func (it *catfileObjectIterator) Err() error {
func (it *catfileObjectIterator) Result() CatfileObjectResult {
return it.result
}
+
+func (it *catfileObjectIterator) ObjectID() git.ObjectID {
+ return it.result.ObjectInfo.Oid
+}
+
+func (it *catfileObjectIterator) ObjectName() []byte {
+ return it.result.ObjectName
+}
diff --git a/internal/git/gitpipe/object_iterator.go b/internal/git/gitpipe/object_iterator.go
new file mode 100644
index 000000000..5628bc2c7
--- /dev/null
+++ b/internal/git/gitpipe/object_iterator.go
@@ -0,0 +1,19 @@
+package gitpipe
+
+import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+
+// ObjectIterator is a common interface that is shared across the pipeline steps that work with
+// objects.
+type ObjectIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left,
+ // or if an error happened during iteration. The caller must call `Err()` after `Next()` has
+ // returned `false`.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // ObjectID returns the object ID of the current object.
+ ObjectID() git.ObjectID
+ // ObjectName returns the object name of the current object. This is a
+ // implementation-specific field and may not be set.
+ ObjectName() []byte
+}
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index d292a4a09..3b28bde47 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -22,13 +22,12 @@ func TestPipeline_revlist(t *testing.T) {
repo := localrepo.NewTestRepo(t, cfg, repoProto)
for _, tc := range []struct {
- desc string
- revisions []string
- revlistOptions []RevlistOption
- revisionFilter func(RevisionResult) bool
- catfileInfoFilter func(CatfileInfoResult) bool
- expectedResults []CatfileObjectResult
- expectedErr error
+ desc string
+ revisions []string
+ revlistOptions []RevlistOption
+ catfileInfoOptions []CatfileInfoOption
+ expectedResults []CatfileObjectResult
+ expectedErr error
}{
{
desc: "single blob",
@@ -74,9 +73,9 @@ func TestPipeline_revlist(t *testing.T) {
},
revlistOptions: []RevlistOption{
WithObjects(),
- },
- revisionFilter: func(r RevisionResult) bool {
- return r.OID == lfsPointer2
+ WithSkipRevlistResult(func(r *RevisionResult) bool {
+ return r.OID != lfsPointer2
+ }),
},
expectedResults: []CatfileObjectResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
@@ -110,8 +109,10 @@ func TestPipeline_revlist(t *testing.T) {
revlistOptions: []RevlistOption{
WithObjects(),
},
- catfileInfoFilter: func(r CatfileInfoResult) bool {
- return r.ObjectInfo.Type == "blob"
+ catfileInfoOptions: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ return objectInfo.Type != "blob"
+ }),
},
expectedResults: []CatfileObjectResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
@@ -154,15 +155,17 @@ func TestPipeline_revlist(t *testing.T) {
},
revlistOptions: []RevlistOption{
WithObjects(),
- },
- revisionFilter: func(r RevisionResult) bool {
- // Let through two LFS pointers and a tree.
- return r.OID == "b95c0fad32f4361845f91d9ce4c1721b52b82793" ||
- r.OID == lfsPointer1 || r.OID == lfsPointer2
- },
- catfileInfoFilter: func(r CatfileInfoResult) bool {
- // Only let through blobs, so only the two LFS pointers remain.
- return r.ObjectInfo.Type == "blob"
+ WithSkipRevlistResult(func(r *RevisionResult) bool {
+ // Let through two LFS pointers and a tree.
+ return r.OID != "b95c0fad32f4361845f91d9ce4c1721b52b82793" &&
+ r.OID != lfsPointer1 && r.OID != lfsPointer2
+ }),
+ },
+ catfileInfoOptions: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ // Only let through blobs, so only the two LFS pointers remain.
+ return objectInfo.Type != "blob"
+ }),
},
expectedResults: []CatfileObjectResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, ObjectName: []byte("files/lfs/lfs_object.iso")},
@@ -190,13 +193,17 @@ func TestPipeline_revlist(t *testing.T) {
revisions: []string{
"doesnotexist",
},
- revisionFilter: func(r RevisionResult) bool {
- require.Fail(t, "filter should not be invoked on errors")
- return true
+ revlistOptions: []RevlistOption{
+ WithSkipRevlistResult(func(r *RevisionResult) bool {
+ require.Fail(t, "filter should not be invoked on errors")
+ return true
+ }),
},
- catfileInfoFilter: func(r CatfileInfoResult) bool {
- require.Fail(t, "filter should not be invoked on errors")
- return true
+ catfileInfoOptions: []CatfileInfoOption{
+ WithSkipCatfileInfoResult(func(r *catfile.ObjectInfo) bool {
+ require.Fail(t, "filter should not be invoked on errors")
+ return true
+ }),
},
expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
@@ -215,15 +222,7 @@ func TestPipeline_revlist(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...)
- if tc.revisionFilter != nil {
- revlistIter = RevisionFilter(ctx, revlistIter, tc.revisionFilter)
- }
-
- catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- if tc.catfileInfoFilter != nil {
- catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter)
- }
-
+ catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...)
catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
var results []CatfileObjectResult
@@ -269,9 +268,7 @@ func TestPipeline_revlist(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, []string{"--all"})
- revlistIter = RevisionFilter(ctx, revlistIter, func(RevisionResult) bool { return true })
catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
i := 0
diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go
index ff2e84cd2..a4eec3a1e 100644
--- a/internal/git/gitpipe/revision.go
+++ b/internal/git/gitpipe/revision.go
@@ -52,6 +52,7 @@ type revlistConfig struct {
firstParent bool
before, after time.Time
author []byte
+ skipResult func(*RevisionResult) bool
}
// RevlistOption is an option for the revlist pipeline step.
@@ -159,6 +160,15 @@ func WithAuthor(author []byte) RevlistOption {
}
}
+// WithSkipRevlistResult will execute the given function for each RevisionResult processed by the
+// pipeline. If the callback returns `true`, then the object will be skipped and not passed down
+// the pipeline.
+func WithSkipRevlistResult(skipResult func(*RevisionResult) bool) RevlistOption {
+ return func(cfg *revlistConfig) {
+ cfg.skipResult = skipResult
+ }
+}
+
// Revlist runs git-rev-list(1) with objects and object names enabled. The returned channel will
// contain all object IDs listed by this command. Cancelling the context will cause the pipeline to
// be cancelled, too.
@@ -272,6 +282,10 @@ func Revlist(
result.ObjectName = oidAndName[1]
}
+ if cfg.skipResult != nil && cfg.skipResult(&result) {
+ continue
+ }
+
if isDone := sendRevisionResult(ctx, resultChan, result); isDone {
return
}
@@ -297,6 +311,22 @@ func Revlist(
}
}
+type forEachRefConfig struct {
+ format string
+}
+
+// ForEachRefOption is an option that can be passed to ForEachRef.
+type ForEachRefOption func(cfg *forEachRefConfig)
+
+// WithForEachRefFormat is the format used by git-for-each-ref. Note that each line _must_ be of
+// format "%(objectname) %(refname)" such that the pipeline can parse it correctly. You may use
+// conditional format statements though to potentially produce multiple such lines.
+func WithForEachRefFormat(format string) ForEachRefOption {
+ return func(cfg *forEachRefConfig) {
+ cfg.format = format
+ }
+}
+
// ForEachRef runs git-for-each-ref(1) with the given patterns and returns a RevisionIterator for
// found references. Patterns must always refer to fully qualified reference names. Patterns for
// which no branch is found do not result in an error. The iterator's object name is set to the
@@ -307,18 +337,25 @@ func ForEachRef(
repo *localrepo.Repo,
patterns []string,
sortField string,
+ opts ...ForEachRefOption,
) RevisionIterator {
- resultChan := make(chan RevisionResult)
+ cfg := forEachRefConfig{
+ // The default format also includes the object type, which requires us to read the
+ // referenced commit's object. It would thus be about 2-3x slower to use the
+ // default format, and instead we move the burden into the next pipeline step by
+ // default.
+ format: "%(objectname) %(refname)",
+ }
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+ resultChan := make(chan RevisionResult)
go func() {
defer close(resultChan)
flags := []git.Option{
- // The default format also includes the object type, which requires
- // us to read the referenced commit's object. It would thus be about
- // 2-3x slower to use the default format, and instead we move the
- // burden into the next pipeline step.
- git.ValueFlag{Name: "--format", Value: "%(objectname) %(refname)"},
+ git.ValueFlag{Name: "--format", Value: cfg.format},
}
if sortField != "" {
flags = append(flags, git.ValueFlag{Name: "--sort", Value: sortField})
@@ -374,47 +411,6 @@ func ForEachRef(
}
}
-// RevisionFilter filters the RevisionResult from the provided iterator with the filter function: if
-// the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors
-// cannot be filtered and will always be passed through.
-func RevisionFilter(ctx context.Context, it RevisionIterator, filter func(RevisionResult) bool) RevisionIterator {
- return RevisionTransform(ctx, it, func(r RevisionResult) []RevisionResult {
- if filter(r) {
- return []RevisionResult{r}
- }
- return []RevisionResult{}
- })
-}
-
-// RevisionTransform transforms each RevisionResult from the provided iterator with the transforming
-// function. Instead of sending the original RevisionResult, it will instead send transformed
-// results.
-func RevisionTransform(ctx context.Context, it RevisionIterator, transform func(RevisionResult) []RevisionResult) RevisionIterator {
- resultChan := make(chan RevisionResult)
-
- go func() {
- defer close(resultChan)
-
- for it.Next() {
- for _, transformed := range transform(it.Result()) {
- if sendRevisionResult(ctx, resultChan, transformed) {
- return
- }
- }
- }
-
- if err := it.Err(); err != nil {
- if sendRevisionResult(ctx, resultChan, RevisionResult{err: err}) {
- return
- }
- }
- }()
-
- return &revisionIterator{
- ch: resultChan,
- }
-}
-
func sendRevisionResult(ctx context.Context, ch chan<- RevisionResult, result RevisionResult) bool {
// In case the context has been cancelled, we have a race between observing an error from
// the killed Git process and observing the context cancellation itself. But if we end up
diff --git a/internal/git/gitpipe/revision_iterator.go b/internal/git/gitpipe/revision_iterator.go
index 828c0d965..8d949e648 100644
--- a/internal/git/gitpipe/revision_iterator.go
+++ b/internal/git/gitpipe/revision_iterator.go
@@ -1,11 +1,10 @@
package gitpipe
+import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+
// RevisionIterator is an iterator returned by the Revlist function.
type RevisionIterator interface {
- // Next iterates to the next item. Returns `false` in case there are no more results left.
- Next() bool
- // Err returns the first error that was encountered.
- Err() error
+ ObjectIterator
// Result returns the current item.
Result() RevisionResult
}
@@ -49,3 +48,11 @@ func (it *revisionIterator) Err() error {
func (it *revisionIterator) Result() RevisionResult {
return it.result
}
+
+func (it *revisionIterator) ObjectID() git.ObjectID {
+ return it.result.OID
+}
+
+func (it *revisionIterator) ObjectName() []byte {
+ return it.result.ObjectName
+}
diff --git a/internal/git/gitpipe/revision_test.go b/internal/git/gitpipe/revision_test.go
index 3db71ec60..b40151dbe 100644
--- a/internal/git/gitpipe/revision_test.go
+++ b/internal/git/gitpipe/revision_test.go
@@ -427,6 +427,53 @@ func TestRevlist(t *testing.T) {
},
expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
+ {
+ desc: "skip everything",
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithObjects(),
+ WithBlobLimit(10),
+ WithObjectTypeFilter(ObjectTypeBlob),
+ WithSkipRevlistResult(func(*RevisionResult) bool { return true }),
+ },
+ },
+ {
+ desc: "skip nothing",
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithObjects(),
+ WithBlobLimit(10),
+ WithObjectTypeFilter(ObjectTypeBlob),
+ WithSkipRevlistResult(func(*RevisionResult) bool { return false }),
+ },
+ expectedResults: []RevisionResult{
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ },
+ },
+ {
+ desc: "skip one",
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithObjects(),
+ WithBlobLimit(10),
+ WithObjectTypeFilter(ObjectTypeBlob),
+ WithSkipRevlistResult(func(r *RevisionResult) bool {
+ return string(r.ObjectName) == "hotfix-2.txt"
+ }),
+ },
+ expectedResults: []RevisionResult{
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -456,8 +503,8 @@ func TestForEachRef(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- readRefs := func(t *testing.T, repo *localrepo.Repo, patterns ...string) []RevisionResult {
- it := ForEachRef(ctx, repo, patterns, "")
+ readRefs := func(t *testing.T, repo *localrepo.Repo, patterns []string, opts ...ForEachRefOption) []RevisionResult {
+ it := ForEachRef(ctx, repo, patterns, "", opts...)
var results []RevisionResult
for it.Next() {
@@ -485,11 +532,11 @@ func TestForEachRef(t *testing.T) {
ObjectName: []byte("refs/heads/master"),
OID: revisions["refs/heads/master"],
},
- }, readRefs(t, repo, "refs/heads/master"))
+ }, readRefs(t, repo, []string{"refs/heads/master"}))
})
t.Run("unqualified branch name", func(t *testing.T) {
- require.Nil(t, readRefs(t, repo, "master"))
+ require.Nil(t, readRefs(t, repo, []string{"master"}))
})
t.Run("multiple branches", func(t *testing.T) {
@@ -502,11 +549,11 @@ func TestForEachRef(t *testing.T) {
ObjectName: []byte("refs/heads/master"),
OID: revisions["refs/heads/master"],
},
- }, readRefs(t, repo, "refs/heads/master", "refs/heads/feature"))
+ }, readRefs(t, repo, []string{"refs/heads/master", "refs/heads/feature"}))
})
t.Run("branches pattern", func(t *testing.T) {
- refs := readRefs(t, repo, "refs/heads/*")
+ refs := readRefs(t, repo, []string{"refs/heads/*"})
require.Greater(t, len(refs), 90)
require.Subset(t, refs, []RevisionResult{
@@ -521,212 +568,33 @@ func TestForEachRef(t *testing.T) {
})
})
+ t.Run("tag with format", func(t *testing.T) {
+ refs := readRefs(t, repo, []string{"refs/tags/v1.0.0"},
+ WithForEachRefFormat("%(objectname) tag\n%(*objectname) peeled"),
+ )
+
+ require.Equal(t, refs, []RevisionResult{
+ {
+ ObjectName: []byte("tag"),
+ OID: "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8",
+ },
+ {
+ ObjectName: []byte("peeled"),
+ OID: "6f6d7e7ed97bb5f0054f2b1df789b39ca89b6ff9",
+ },
+ })
+ })
+
t.Run("multiple patterns", func(t *testing.T) {
- refs := readRefs(t, repo, "refs/heads/*", "refs/tags/*")
+ refs := readRefs(t, repo, []string{"refs/heads/*", "refs/tags/*"})
require.Greater(t, len(refs), 90)
})
t.Run("nonexisting branch", func(t *testing.T) {
- require.Nil(t, readRefs(t, repo, "refs/heads/idontexist"))
+ require.Nil(t, readRefs(t, repo, []string{"refs/heads/idontexist"}))
})
t.Run("nonexisting pattern", func(t *testing.T) {
- require.Nil(t, readRefs(t, repo, "refs/idontexist/*"))
+ require.Nil(t, readRefs(t, repo, []string{"refs/idontexist/*"}))
})
}
-
-func TestRevisionFilter(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []RevisionResult
- filter func(RevisionResult) bool
- expectedResults []RevisionResult
- expectedErr error
- }{
- {
- desc: "all accepted",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- filter: func(RevisionResult) bool {
- return true
- },
- expectedResults: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- },
- {
- desc: "all filtered",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- filter: func(RevisionResult) bool {
- return false
- },
- expectedResults: nil,
- },
- {
- desc: "errors always get through",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {err: errors.New("foobar")},
- {OID: "c"},
- },
- filter: func(RevisionResult) bool {
- return false
- },
- expectedErr: errors.New("foobar"),
- },
- {
- desc: "subset filtered",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- filter: func(r RevisionResult) bool {
- return r.OID == "b"
- },
- expectedResults: []RevisionResult{
- {OID: "b"},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- it := RevisionFilter(ctx, NewRevisionIterator(tc.input), tc.filter)
-
- var results []RevisionResult
- for it.Next() {
- results = append(results, it.Result())
- }
-
- require.Equal(t, tc.expectedErr, it.Err())
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestRevisionTransform(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []RevisionResult
- transform func(RevisionResult) []RevisionResult
- expectedResults []RevisionResult
- expectedErr error
- }{
- {
- desc: "identity mapping",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- transform: func(r RevisionResult) []RevisionResult {
- return []RevisionResult{r}
- },
- expectedResults: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- },
- {
- desc: "strip object",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- transform: func(r RevisionResult) []RevisionResult {
- if r.OID == "b" {
- return []RevisionResult{}
- }
- return []RevisionResult{r}
- },
- expectedResults: []RevisionResult{
- {OID: "a"},
- {OID: "c"},
- },
- },
- {
- desc: "replace items",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- transform: func(RevisionResult) []RevisionResult {
- return []RevisionResult{{OID: "x"}}
- },
- expectedResults: []RevisionResult{
- {OID: "x"},
- {OID: "x"},
- {OID: "x"},
- },
- },
- {
- desc: "add additional items",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {OID: "c"},
- },
- transform: func(r RevisionResult) []RevisionResult {
- return []RevisionResult{
- r,
- {OID: r.OID + "x"},
- }
- },
- expectedResults: []RevisionResult{
- {OID: "a"},
- {OID: "ax"},
- {OID: "b"},
- {OID: "bx"},
- {OID: "c"},
- {OID: "cx"},
- },
- },
- {
- desc: "error handling",
- input: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- {err: errors.New("foobar")},
- {OID: "c"},
- },
- transform: func(r RevisionResult) []RevisionResult {
- return []RevisionResult{r}
- },
- expectedResults: []RevisionResult{
- {OID: "a"},
- {OID: "b"},
- },
- expectedErr: errors.New("foobar"),
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- it := RevisionTransform(ctx, NewRevisionIterator(tc.input), tc.transform)
-
- var results []RevisionResult
- for it.Next() {
- results = append(results, it.Result())
- }
-
- require.Equal(t, tc.expectedErr, it.Err())
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index aafeef302..391500403 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -41,16 +42,6 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
- if err != nil {
- return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
- }
-
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
- if err != nil {
- return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
- }
-
chunker := chunk.New(&blobSender{
send: func(blobs []*gitalypb.ListBlobsResponse_Blob) error {
return stream.Send(&gitalypb.ListBlobsResponse{
@@ -65,9 +56,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
}
revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter)
- if err := processBlobs(ctx, objectReader, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(),
+ if err := s.processBlobs(ctx, repo, revlistIter, nil, req.GetLimit(), req.GetBytesLimit(),
func(oid string, size int64, contents []byte, path []byte) error {
if !req.GetWithPaths() {
path = nil
@@ -91,9 +81,10 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
return nil
}
-func processBlobs(
+func (s *server) processBlobs(
ctx context.Context,
- objectReader catfile.ObjectReader,
+ repo *localrepo.Repo,
+ objectIter gitpipe.ObjectIterator,
catfileInfoIter gitpipe.CatfileInfoIterator,
blobsLimit uint32,
bytesLimit int64,
@@ -102,6 +93,19 @@ func processBlobs(
// If we have a zero bytes limit, then the caller didn't request any blob contents at all.
// We can thus skip reading blob contents completely.
if bytesLimit == 0 {
+ // This is a bit untidy, but some callers may already use an object info iterator to
+ // enumerate objects, where it thus wouldn't make sense to recreate it via the
+ // object iterator. We thus support an optional `catfileInfoIter` parameter: if set,
+ // we just use that one and ignore the object iterator.
+ if catfileInfoIter == nil {
+ objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
+ if err != nil {
+ return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
+ }
+
+ catfileInfoIter = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
+ }
+
var i uint32
for catfileInfoIter.Next() {
blob := catfileInfoIter.Result()
@@ -125,7 +129,12 @@ func processBlobs(
return helper.ErrInternal(err)
}
} else {
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
+ if err != nil {
+ return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
+ }
+
+ catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter)
var i uint32
for catfileObjectIter.Next() {
@@ -230,17 +239,13 @@ func (s *server) ListAllBlobs(req *gitalypb.ListAllBlobsRequest, stream gitalypb
},
})
- objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
- if err != nil {
- return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
- }
-
- catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
- catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
- return r.ObjectInfo.Type == "blob"
- })
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo,
+ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ return objectInfo.Type != "blob"
+ }),
+ )
- if err := processBlobs(ctx, objectReader, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(),
+ if err := s.processBlobs(ctx, repo, catfileInfoIter, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(),
func(oid string, size int64, contents []byte, path []byte) error {
return chunker.Send(&gitalypb.ListAllBlobsResponse_Blob{
Oid: oid,
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 907db1e51..f7e37c80d 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -8,6 +8,7 @@ import (
gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk"
@@ -51,25 +52,17 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
repo := s.localrepo(in.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
- if err != nil {
- return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
- }
-
objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
- revlistOptions := []gitpipe.RevlistOption{
+ revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(),
gitpipe.WithObjects(),
gitpipe.WithBlobLimit(lfsPointerMaxSize),
gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob),
- }
-
- revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ )
+ catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
@@ -102,10 +95,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
- catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
- catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
- return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
- })
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo,
+ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
+ }),
+ )
catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
@@ -150,10 +144,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)}
}
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs))
- catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
- return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
- })
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs),
+ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
+ }),
+ )
catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil {
diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go
index 072b0bb47..d7dfdb66b 100644
--- a/internal/gitaly/service/commit/list_all_commits.go
+++ b/internal/gitaly/service/commit/list_all_commits.go
@@ -35,27 +35,23 @@ func (s *server) ListAllCommits(
return helper.ErrInternalf("creating object reader: %w", err)
}
- catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
-
// If we've got a pagination token, then we will only start to print commits as soon as
// we've seen the token.
token := request.GetPaginationParams().GetPageToken()
waitingForToken := token != ""
- catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
- if waitingForToken {
- waitingForToken = r.ObjectInfo.Oid != git.ObjectID(token)
- // We also skip the token itself, thus we always return `false`
- // here.
- return false
- }
-
- if r.ObjectInfo.Type != "commit" {
- return false
- }
-
- return true
- })
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo,
+ gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
+ if waitingForToken {
+ waitingForToken = objectInfo.Oid != git.ObjectID(token)
+ // We also skip the token itself, thus we always return `false`
+ // here.
+ return true
+ }
+
+ return objectInfo.Type != "commit"
+ }),
+ )
catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go
index 44c599b96..8a3cbfce2 100644
--- a/internal/gitaly/service/commit/list_commits.go
+++ b/internal/gitaly/service/commit/list_commits.go
@@ -39,11 +39,6 @@ func (s *server) ListCommits(
ctx := stream.Context()
repo := s.localrepo(request.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
- if err != nil {
- return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
- }
-
objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
@@ -88,26 +83,24 @@ func (s *server) ListCommits(
revlistOptions = append(revlistOptions, gitpipe.WithAuthor(request.GetAuthor()))
}
- revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...)
-
// If we've got a pagination token, then we will only start to print commits as soon as
// we've seen the token.
if token := request.GetPaginationParams().GetPageToken(); token != "" {
tokenSeen := false
- revlistIter = gitpipe.RevisionFilter(ctx, revlistIter, func(r gitpipe.RevisionResult) bool {
+ revlistOptions = append(revlistOptions, gitpipe.WithSkipRevlistResult(func(r *gitpipe.RevisionResult) bool {
if !tokenSeen {
tokenSeen = r.OID == git.ObjectID(token)
// We also skip the token itself, thus we always return `false`
// here.
- return false
+ return true
}
- return true
- })
+ return false
+ }))
}
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
chunker := chunk.New(&commitsSender{
send: func(commits []*gitalypb.GitCommit) error {
diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go
index 051d2e366..e7667d349 100644
--- a/internal/gitaly/service/ref/find_all_tags.go
+++ b/internal/gitaly/service/ref/find_all_tags.go
@@ -38,77 +38,15 @@ func (s *server) FindAllTags(in *gitalypb.FindAllTagsRequest, stream gitalypb.Re
}
func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortField string, stream gitalypb.RefService_FindAllTagsServer) error {
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
- if err != nil {
- return fmt.Errorf("error creating object info reader: %v", err)
- }
-
objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return fmt.Errorf("error creating object reader: %v", err)
}
- forEachRefIter := gitpipe.ForEachRef(ctx, repo, []string{"refs/tags/"}, sortField)
- forEachRefIter = gitpipe.RevisionTransform(ctx, forEachRefIter,
- func(r gitpipe.RevisionResult) []gitpipe.RevisionResult {
- // We transform the pipeline to include each tag-reference twice: once for
- // the "normal" object, and once we opportunistically peel the object to a
- // non-tag object. This is required such that we can efficiently parse the
- // tagged object.
- return []gitpipe.RevisionResult{
- r,
- {OID: r.OID + "^{}"},
- }
- },
- )
-
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, forEachRefIter)
-
- // In the previous pipeline step, we request information about both the object and the
- // peeled object in case the object is a tag. Given that we now know about object types, we
- // can filter out the second request in case the object is not a tag: peeling a non-tag
- // object to a non-tag object is always going to end up with the same object anyway. And
- // requesting the same object twice is moot.
- type state int
- const (
- // stateTag indicates that the next object is going to be a tag.
- stateTag = state(iota)
- // statePeeledTag indicates that the next object is going to be the peeled object of
- // the preceding tag.
- statePeeledTag
- // stateSkip indicates that the next object shall be skipped because it is the
- // peeled version of a non-tag object, which is the same object anyway.
- stateSkip
+ forEachRefIter := gitpipe.ForEachRef(ctx, repo, []string{"refs/tags/"}, sortField,
+ gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"),
)
-
- currentState := stateTag
- catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter,
- func(r gitpipe.CatfileInfoResult) bool {
- switch currentState {
- case stateTag:
- // If we've got a tag, then we want to also see its peeled object.
- // Otherwise, we can skip over the peeled object.
- currentState = statePeeledTag
- if r.ObjectInfo.Type != "tag" {
- currentState = stateSkip
- }
- return true
- case statePeeledTag:
- currentState = stateTag
- return true
- case stateSkip:
- currentState = stateTag
- return false
- }
-
- // We could try to gracefully handle this, but I don't see much of a point
- // given that we can see above that it's never going to be anything else but
- // a known state.
- panic("invalid state")
- },
- )
-
- catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter)
chunker := chunk.New(&tagSender{stream: stream})
diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go
index ad2630d60..091275075 100644
--- a/internal/gitaly/service/ref/tag_signatures.go
+++ b/internal/gitaly/service/ref/tag_signatures.go
@@ -39,11 +39,6 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
ctx := stream.Context()
repo := s.localrepo(req.GetRepository())
- objectInfoReader, err := s.catfileCache.ObjectInfoReader(ctx, repo)
- if err != nil {
- return helper.ErrInternalf("creating object info reader: %w", err)
- }
-
objectReader, err := s.catfileCache.ObjectReader(ctx, repo)
if err != nil {
return helper.ErrInternalf("creating object reader: %w", err)
@@ -63,8 +58,7 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
}
revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...)
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
for catfileObjectIter.Next() {
tag := catfileObjectIter.Result()