Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2021-06-30 09:56:49 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2021-06-30 09:56:49 +0300
commit5694344d3ee87c4f6ab663197596ba860f63ef50 (patch)
treefc730a26bd4b5c502751c799a0a5c20fceb4edf0
parentd5777441938369d512a7825c57ccf6115d7afdfa (diff)
parent39119388e82bccaf1de432a7a23b989f311d6b13 (diff)
Merge branch 'pks-gitpipe-package' into 'master'
Move object pipeline from blob service into "gitpipe" package See merge request gitlab-org/gitaly!3617
-rw-r--r--internal/git/gitpipe/catfile_info.go177
-rw-r--r--internal/git/gitpipe/catfile_info_iterator.go51
-rw-r--r--internal/git/gitpipe/catfile_info_test.go231
-rw-r--r--internal/git/gitpipe/catfile_object.go130
-rw-r--r--internal/git/gitpipe/catfile_object_iterator.go51
-rw-r--r--internal/git/gitpipe/catfile_object_test.go130
-rw-r--r--internal/git/gitpipe/pipeline_test.go291
-rw-r--r--internal/git/gitpipe/revlist.go195
-rw-r--r--internal/git/gitpipe/revlist_iterator.go51
-rw-r--r--internal/git/gitpipe/revlist_test.go346
-rw-r--r--internal/git/gitpipe/testhelper_test.go21
-rw-r--r--internal/gitaly/service/blob/blobs.go53
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go60
-rw-r--r--internal/gitaly/service/blob/pipeline.go439
-rw-r--r--internal/gitaly/service/blob/pipeline_test.go977
15 files changed, 1734 insertions, 1469 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
new file mode 100644
index 000000000..5d928ef51
--- /dev/null
+++ b/internal/git/gitpipe/catfile_info.go
@@ -0,0 +1,177 @@
+package gitpipe
+
+import (
+ "bufio"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+)
+
+// CatfileInfoResult is a result for the CatfileInfo pipeline step.
+type CatfileInfoResult struct {
+ // err is an error which occurred during execution of the pipeline.
+ err error
+
+ // ObjectName is the object name as received from the revlistResultChan.
+ ObjectName []byte
+ // ObjectInfo is the object info of the object.
+ ObjectInfo *catfile.ObjectInfo
+}
+
+// CatfileInfo processes revlistResults from the given channel and extracts object information via
+// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
+// results. Any error received via the channel or encountered in this step will cause the pipeline
+// to fail. Context cancellation will gracefully halt the pipeline.
+func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) CatfileInfoIterator {
+ resultChan := make(chan CatfileInfoResult)
+
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result CatfileInfoResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ for revlistIterator.Next() {
+ revlistResult := revlistIterator.Result()
+
+ objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision())
+ if err != nil {
+ sendResult(CatfileInfoResult{
+ err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err),
+ })
+ return
+ }
+
+ if isDone := sendResult(CatfileInfoResult{
+ ObjectName: revlistResult.ObjectName,
+ ObjectInfo: objectInfo,
+ }); isDone {
+ return
+ }
+ }
+
+ if err := revlistIterator.Err(); err != nil {
+ sendResult(CatfileInfoResult{err: err})
+ return
+ }
+ }()
+
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
+}
+
+// CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
+// extracts their object info via `git cat-file --batch-check`. The returned channel will contain
+// all processed results. Any error encountered during execution of this pipeline step will cause
+// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
+// pipeline step, the resulting catfileInfoResults will never have an object name.
+func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInfoIterator {
+ resultChan := make(chan CatfileInfoResult)
+
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result CatfileInfoResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ cmd, err := repo.Exec(ctx, git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-all-objects"},
+ git.Flag{Name: "--batch-check"},
+ git.Flag{Name: "--buffer"},
+ git.Flag{Name: "--unordered"},
+ },
+ })
+ if err != nil {
+ sendResult(CatfileInfoResult{
+ err: fmt.Errorf("spawning cat-file failed: %w", err),
+ })
+ return
+ }
+
+ reader := bufio.NewReader(cmd)
+ for {
+ objectInfo, err := catfile.ParseObjectInfo(reader)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
+ sendResult(CatfileInfoResult{
+ err: fmt.Errorf("parsing object info: %w", err),
+ })
+ return
+ }
+
+ if isDone := sendResult(CatfileInfoResult{
+ ObjectInfo: objectInfo,
+ }); isDone {
+ return
+ }
+ }
+
+ if err := cmd.Wait(); err != nil {
+ sendResult(CatfileInfoResult{
+ err: fmt.Errorf("cat-file failed: %w", err),
+ })
+ return
+ }
+ }()
+
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
+}
+
+// CatfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
+// function: if the filter returns `false` for a given item, then it will be dropped from the
+// pipeline. Errors cannot be filtered and will always be passed through.
+func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(CatfileInfoResult) bool) CatfileInfoIterator {
+ resultChan := make(chan CatfileInfoResult)
+
+ go func() {
+ defer close(resultChan)
+
+ for it.Next() {
+ result := it.Result()
+ if filter(result) {
+ select {
+ case resultChan <- result:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ select {
+ case resultChan <- CatfileInfoResult{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
+}
diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go
new file mode 100644
index 000000000..54a75b5d9
--- /dev/null
+++ b/internal/git/gitpipe/catfile_info_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// CatfileInfoIterator is an iterator returned by the Revlist function.
+type CatfileInfoIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() CatfileInfoResult
+}
+
+// NewCatfileInfoIterator returns a new CatfileInfoIterator for the given items.
+func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator {
+ itemChan := make(chan CatfileInfoResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &catfileInfoIterator{
+ ch: itemChan,
+ }
+}
+
+type catfileInfoIterator struct {
+ ch <-chan CatfileInfoResult
+ result CatfileInfoResult
+}
+
+func (it *catfileInfoIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *catfileInfoIterator) Err() error {
+ return it.result.err
+}
+
+func (it *catfileInfoIterator) Result() CatfileInfoResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
new file mode 100644
index 000000000..a57b7075d
--- /dev/null
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -0,0 +1,231 @@
+package gitpipe
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+)
+
+const (
+ lfsPointer1 = "0c304a93cb8430108629bbbcaa27db3343299bc0"
+ lfsPointer2 = "f78df813119a79bfbe0442ab92540a61d3ab7ff3"
+ lfsPointer3 = "bab31d249f78fba464d1b75799aad496cc07fa3b"
+ lfsPointer4 = "125fcc9f6e33175cb278b9b2809154d2535fe19f"
+)
+
+func TestCatfileInfo(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ for _, tc := range []struct {
+ desc string
+ revlistInputs []RevlistResult
+ expectedResults []CatfileInfoResult
+ expectedErr error
+ }{
+ {
+ desc: "single blob",
+ revlistInputs: []RevlistResult{
+ {OID: lfsPointer1},
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ },
+ {
+ desc: "multiple blobs",
+ revlistInputs: []RevlistResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer2},
+ {OID: lfsPointer3},
+ {OID: lfsPointer4},
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
+ },
+ },
+ {
+ desc: "object name",
+ revlistInputs: []RevlistResult{
+ {OID: "b95c0fad32f4361845f91d9ce4c1721b52b82793"},
+ {OID: "93e123ac8a3e6a0b600953d7598af629dec7b735", ObjectName: []byte("branch-test.txt")},
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
+ },
+ },
+ {
+ desc: "invalid object ID",
+ revlistInputs: []RevlistResult{
+ {OID: "invalidobjectid"},
+ },
+ expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"),
+ },
+ {
+ desc: "mixed valid and invalid revision",
+ revlistInputs: []RevlistResult{
+ {OID: lfsPointer1},
+ {OID: "invalidobjectid"},
+ {OID: lfsPointer2},
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
+ require.NoError(t, err)
+
+ it := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs))
+
+ var results []CatfileInfoResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+
+ // We're converting the error here to a plain un-nested error such
+ // that we don't have to replicate the complete error's structure.
+ err = it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+}
+
+func TestCatfileInfoAllObjects(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
+ tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
+ {Path: "foobar", Mode: "100644", OID: blob1},
+ })
+ commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
+
+ it := CatfileInfoAllObjects(ctx, repo)
+
+ var results []CatfileInfoResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+ require.NoError(t, it.Err())
+
+ require.ElementsMatch(t, []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
+ }, results)
+}
+
+func TestCatfileInfoFilter(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ input []CatfileInfoResult
+ filter func(CatfileInfoResult) bool
+ expectedResults []CatfileInfoResult
+ expectedErr error
+ }{
+ {
+ desc: "all accepted",
+ input: []CatfileInfoResult{
+ {ObjectName: []byte{'a'}},
+ {ObjectName: []byte{'b'}},
+ {ObjectName: []byte{'c'}},
+ },
+ filter: func(CatfileInfoResult) bool {
+ return true
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectName: []byte{'a'}},
+ {ObjectName: []byte{'b'}},
+ {ObjectName: []byte{'c'}},
+ },
+ },
+ {
+ desc: "all filtered",
+ input: []CatfileInfoResult{
+ {ObjectName: []byte{'a'}},
+ {ObjectName: []byte{'b'}},
+ {ObjectName: []byte{'c'}},
+ },
+ filter: func(CatfileInfoResult) bool {
+ return false
+ },
+ },
+ {
+ desc: "errors always get through",
+ input: []CatfileInfoResult{
+ {ObjectName: []byte{'a'}},
+ {ObjectName: []byte{'b'}},
+ {err: errors.New("foobar")},
+ {ObjectName: []byte{'c'}},
+ },
+ filter: func(CatfileInfoResult) bool {
+ return false
+ },
+ expectedErr: errors.New("foobar"),
+ },
+ {
+ desc: "subset filtered",
+ input: []CatfileInfoResult{
+ {ObjectName: []byte{'a'}},
+ {ObjectName: []byte{'b'}},
+ {ObjectName: []byte{'c'}},
+ },
+ filter: func(r CatfileInfoResult) bool {
+ return r.ObjectName[0] == 'b'
+ },
+ expectedResults: []CatfileInfoResult{
+ {ObjectName: []byte{'b'}},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ it := CatfileInfoFilter(ctx, NewCatfileInfoIterator(tc.input), tc.filter)
+
+ var results []CatfileInfoResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+
+ require.Equal(t, tc.expectedErr, it.Err())
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+}
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
new file mode 100644
index 000000000..5a50c1298
--- /dev/null
+++ b/internal/git/gitpipe/catfile_object.go
@@ -0,0 +1,130 @@
+package gitpipe
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+)
+
+// CatfileObjectResult is a result for the CatfileObject pipeline step.
+type CatfileObjectResult struct {
+ // err is an error which occurred during execution of the pipeline.
+ err error
+
+ // ObjectName is the object name as received from the revlistResultChan.
+ ObjectName []byte
+ // ObjectInfo is the object info of the object.
+ ObjectInfo *catfile.ObjectInfo
+ // obbjectReader is the reader for the raw object data. The reader must always be consumed
+ // by the caller.
+ ObjectReader io.Reader
+}
+
+// CatfileObject processes catfileInfoResults from the given channel and reads associated objects
+// into memory via `git cat-file --batch`. The returned channel will contain all processed objects.
+// Any error received via the channel or encountered in this step will cause the pipeline to fail.
+// Context cancellation will gracefully halt the pipeline. The returned object readers must always
+// be fully consumed by the caller.
+func CatfileObject(
+ ctx context.Context,
+ catfileProcess catfile.Batch,
+ it CatfileInfoIterator,
+) CatfileObjectIterator {
+ resultChan := make(chan CatfileObjectResult)
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result CatfileObjectResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ var objectReader *signallingReader
+
+ for it.Next() {
+ catfileInfoResult := it.Result()
+
+ // We mustn't try to read another object before reading the previous object
+ // has concluded. Given that this is not under our control but under the
+ // control of the caller, we thus have to wait until the blocking reader has
+ // reached EOF.
+ if objectReader != nil {
+ select {
+ case <-objectReader.doneCh:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ var object *catfile.Object
+ var err error
+
+ objectType := catfileInfoResult.ObjectInfo.Type
+ switch objectType {
+ case "tag":
+ object, err = catfileProcess.Tag(ctx, catfileInfoResult.ObjectInfo.Oid.Revision())
+ case "commit":
+ object, err = catfileProcess.Commit(ctx, catfileInfoResult.ObjectInfo.Oid.Revision())
+ case "tree":
+ object, err = catfileProcess.Tree(ctx, catfileInfoResult.ObjectInfo.Oid.Revision())
+ case "blob":
+ object, err = catfileProcess.Blob(ctx, catfileInfoResult.ObjectInfo.Oid.Revision())
+ default:
+ err = fmt.Errorf("unknown object type %q", objectType)
+ }
+
+ if err != nil {
+ sendResult(CatfileObjectResult{
+ err: fmt.Errorf("requesting object: %w", err),
+ })
+ return
+ }
+
+ objectReader = &signallingReader{
+ reader: object,
+ doneCh: make(chan interface{}),
+ }
+
+ if isDone := sendResult(CatfileObjectResult{
+ ObjectName: catfileInfoResult.ObjectName,
+ ObjectInfo: catfileInfoResult.ObjectInfo,
+ ObjectReader: objectReader,
+ }); isDone {
+ return
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ sendResult(CatfileObjectResult{err: err})
+ return
+ }
+ }()
+
+ return &catfileObjectIterator{
+ ch: resultChan,
+ }
+}
+
+type signallingReader struct {
+ reader io.Reader
+ doneCh chan interface{}
+ closeOnce sync.Once
+}
+
+func (r *signallingReader) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ if errors.Is(err, io.EOF) {
+ r.closeOnce.Do(func() {
+ close(r.doneCh)
+ })
+ }
+ return n, err
+}
diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go
new file mode 100644
index 000000000..372a94f3c
--- /dev/null
+++ b/internal/git/gitpipe/catfile_object_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// CatfileObjectIterator is an iterator returned by the Revlist function.
+type CatfileObjectIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() CatfileObjectResult
+}
+
+// NewCatfileObjectIterator returns a new CatfileObjectIterator for the given items.
+func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator {
+ itemChan := make(chan CatfileObjectResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &catfileObjectIterator{
+ ch: itemChan,
+ }
+}
+
+type catfileObjectIterator struct {
+ ch <-chan CatfileObjectResult
+ result CatfileObjectResult
+}
+
+func (it *catfileObjectIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *catfileObjectIterator) Err() error {
+ return it.result.err
+}
+
+func (it *catfileObjectIterator) Result() CatfileObjectResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
new file mode 100644
index 000000000..eda42531a
--- /dev/null
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -0,0 +1,130 @@
+package gitpipe
+
+import (
+ "errors"
+ "io/ioutil"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+)
+
+func TestCatfileObject(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ for _, tc := range []struct {
+ desc string
+ catfileInfoInputs []CatfileInfoResult
+ expectedResults []CatfileObjectResult
+ expectedErr error
+ }{
+ {
+ desc: "single blob",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ },
+ {
+ desc: "multiple blobs",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
+ },
+ },
+ {
+ desc: "revlist result with object names",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
+ },
+ },
+ {
+ desc: "invalid object ID",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "invalidobjectid", Type: "blob"}},
+ },
+ expectedErr: errors.New("requesting object: object not found"),
+ },
+ {
+ desc: "invalid object type",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}},
+ },
+ expectedErr: errors.New("requesting object: unknown object type \"foobar\""),
+ },
+ {
+ desc: "mixed valid and invalid revision",
+ catfileInfoInputs: []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2}},
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ expectedErr: errors.New("requesting object: unknown object type \"foobar\""),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
+ require.NoError(t, err)
+
+ it := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs))
+
+ var results []CatfileObjectResult
+ for it.Next() {
+ result := it.Result()
+
+ // While we could also assert object data, let's not do
+ // this: it would just be too annoying.
+ require.NotNil(t, result.ObjectReader)
+
+ objectData, err := ioutil.ReadAll(result.ObjectReader)
+ require.NoError(t, err)
+ require.Len(t, objectData, int(result.ObjectInfo.Size))
+
+ result.ObjectReader = nil
+ results = append(results, result)
+ }
+
+ // We're converting the error here to a plain un-nested error such
+ // that we don't have to replicate the complete error's structure.
+ err = it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+}
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
new file mode 100644
index 000000000..75b3396c3
--- /dev/null
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -0,0 +1,291 @@
+package gitpipe
+
+import (
+ "context"
+ "errors"
+ "io"
+ "io/ioutil"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+)
+
+func TestPipeline(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ for _, tc := range []struct {
+ desc string
+ revisions []string
+ revlistFilter func(RevlistResult) bool
+ catfileInfoFilter func(CatfileInfoResult) bool
+ expectedResults []CatfileObjectResult
+ expectedErr error
+ }{
+ {
+ desc: "single blob",
+ revisions: []string{
+ lfsPointer1,
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ },
+ },
+ {
+ desc: "multiple blobs",
+ revisions: []string{
+ lfsPointer1,
+ lfsPointer2,
+ lfsPointer3,
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
+ },
+ },
+ {
+ desc: "multiple blobs with filter",
+ revisions: []string{
+ lfsPointer1,
+ lfsPointer2,
+ lfsPointer3,
+ },
+ revlistFilter: func(r RevlistResult) bool {
+ return r.OID == lfsPointer2
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ },
+ },
+ {
+ desc: "tree",
+ revisions: []string{
+ "b95c0fad32f4361845f91d9ce4c1721b52b82793",
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
+ },
+ },
+ {
+ desc: "tree with blob filter",
+ revisions: []string{
+ "b95c0fad32f4361845f91d9ce4c1721b52b82793",
+ },
+ catfileInfoFilter: func(r CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob"
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, ObjectName: []byte("branch-test.txt")},
+ },
+ },
+ {
+ desc: "revision range",
+ revisions: []string{
+ "^master~",
+ "master",
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "1e292f8fedd741b75372e19097c76d327140c312", Type: "commit", Size: 388}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "07f8147e8e73aab6c935c296e8cdc5194dee729b", Type: "tree", Size: 780}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "ceb102b8d3f9a95c2eb979213e49f7cc1b23d56e", Type: "tree", Size: 258}, ObjectName: []byte("files")},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "2132d150328bd9334cc4e62a16a5d998a7e399b9", Type: "tree", Size: 31}, ObjectName: []byte("files/flat")},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "f3942dc8b824a2c9359e518d48e68f84461bd2f7", Type: "tree", Size: 34}, ObjectName: []byte("files/flat/path")},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "ea7249055466085d0a6c69951908ef47757e92f4", Type: "tree", Size: 39}, ObjectName: []byte("files/flat/path/correct")},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: "c1c67abbaf91f624347bb3ae96eabe3a1b742478", Type: "commit", Size: 326}},
+ },
+ },
+ {
+ desc: "--all with all filters",
+ revisions: []string{
+ "--all",
+ },
+ revlistFilter: func(r RevlistResult) bool {
+ // Let through two LFS pointers and a tree.
+ return r.OID == "b95c0fad32f4361845f91d9ce4c1721b52b82793" ||
+ r.OID == lfsPointer1 || r.OID == lfsPointer2
+ },
+ catfileInfoFilter: func(r CatfileInfoResult) bool {
+ // Only let through blobs, so only the two LFS pointers remain.
+ return r.ObjectInfo.Type == "blob"
+ },
+ expectedResults: []CatfileObjectResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, ObjectName: []byte("files/lfs/lfs_object.iso")},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}, ObjectName: []byte("another.lfs")},
+ },
+ },
+ {
+ desc: "invalid revision",
+ revisions: []string{
+ "doesnotexist",
+ },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
+ },
+ {
+ desc: "mixed valid and invalid revision",
+ revisions: []string{
+ lfsPointer1,
+ "doesnotexist",
+ lfsPointer2,
+ },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
+ },
+ {
+ desc: "invalid revision with all filters",
+ revisions: []string{
+ "doesnotexist",
+ },
+ revlistFilter: func(r RevlistResult) bool {
+ require.Fail(t, "filter should not be invoked on errors")
+ return true
+ },
+ catfileInfoFilter: func(r CatfileInfoResult) bool {
+ require.Fail(t, "filter should not be invoked on errors")
+ return true
+ },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
+ require.NoError(t, err)
+
+ revlistIter := Revlist(ctx, repo, tc.revisions)
+ if tc.revlistFilter != nil {
+ revlistIter = RevlistFilter(ctx, revlistIter, tc.revlistFilter)
+ }
+
+ catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
+ if tc.catfileInfoFilter != nil {
+ catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter)
+ }
+
+ catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter)
+
+ var results []CatfileObjectResult
+ for catfileObjectIter.Next() {
+ result := catfileObjectIter.Result()
+
+ // While we could also assert object data, let's not do
+ // this: it would just be too annoying.
+ require.NotNil(t, result.ObjectReader)
+
+ objectData, err := ioutil.ReadAll(result.ObjectReader)
+ require.NoError(t, err)
+ require.Len(t, objectData, int(result.ObjectInfo.Size))
+
+ result.ObjectReader = nil
+
+ results = append(results, result)
+ }
+
+ // We're converting the error here to a plain un-nested error such that we
+ // don't have to replicate the complete error's structure.
+ err = catfileObjectIter.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
+ require.NoError(t, err)
+
+ // We need to create a separate child context because otherwise we'd kill the batch
+ // process.
+ childCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ revlistIter := Revlist(childCtx, repo, []string{"--all"})
+ revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true })
+ catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter)
+ catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
+ catfileObjectIter := CatfileObject(childCtx, catfileProcess, catfileInfoIter)
+
+ i := 0
+ for catfileObjectIter.Next() {
+ i++
+
+ _, err := io.Copy(ioutil.Discard, catfileObjectIter.Result().ObjectReader)
+ require.NoError(t, err)
+
+ if i == 3 {
+ cancel()
+ }
+ }
+
+ require.NoError(t, catfileObjectIter.Err())
+
+ // Context cancellation is timing sensitive: at the point of cancelling the context,
+ // the last pipeline step may already have queued up an additional result. We thus
+ // cannot assert the exact number of requests, but we know that it's bounded.
+ require.LessOrEqual(t, i, 4)
+ })
+
+ t.Run("interleaving object reads", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
+ require.NoError(t, err)
+
+ revlistIter := Revlist(ctx, repo, []string{"--all"})
+ catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter)
+
+ i := 0
+ var wg sync.WaitGroup
+ for catfileObjectIter.Next() {
+ wg.Add(1)
+ i++
+
+ // With the catfile package, one mustn't ever request a new object before
+ // the old object's reader was completely consumed. We cannot reliably test
+ // this given that the object channel, if it behaves correctly, will block
+ // until we've read the old object. Chances are high though that we'd
+ // eventually hit the race here in case we didn't correctly synchronize on
+ // the object reader.
+ go func(object CatfileObjectResult) {
+ defer wg.Done()
+ _, err := io.Copy(ioutil.Discard, object.ObjectReader)
+ require.NoError(t, err)
+ }(catfileObjectIter.Result())
+ }
+
+ require.NoError(t, catfileObjectIter.Err())
+ wg.Wait()
+
+ // We could in theory assert the exact amount of objects, but this would make it
+ // harder than necessary to change the test repo's contents.
+ require.Greater(t, i, 1000)
+ })
+}
diff --git a/internal/git/gitpipe/revlist.go b/internal/git/gitpipe/revlist.go
new file mode 100644
index 000000000..f13b3336b
--- /dev/null
+++ b/internal/git/gitpipe/revlist.go
@@ -0,0 +1,195 @@
+package gitpipe
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+)
+
+// RevlistResult is a result for the revlist pipeline step.
+type RevlistResult struct {
+ // err is an error which occurred during execution of the pipeline.
+ err error
+
+ // OID is the object ID of an object printed by git-rev-list(1).
+ OID git.ObjectID
+ // ObjectName is the name of the object. This is typically the path of the object if it was
+ // traversed via either a tree or a commit. The path depends on the order in which objects
+ // are traversed: if e.g. two different trees refer to the same blob with different names,
+ // the blob's path depends on which of the trees was traversed first.
+ ObjectName []byte
+}
+
+// ObjectType is a Git object type used for filtering objects.
+type ObjectType string
+
+const (
+ // ObjectTypeCommit is the type of a Git commit.
+ ObjectTypeCommit = ObjectType("commit")
+ // ObjectTypeBlob is the type of a Git blob.
+ ObjectTypeBlob = ObjectType("blob")
+ // ObjectTypeTree is the type of a Git tree.
+ ObjectTypeTree = ObjectType("tree")
+ // ObjectTypeTag is the type of a Git tag.
+ ObjectTypeTag = ObjectType("tag")
+)
+
+// revlistConfig is configuration for the revlist pipeline step.
+type revlistConfig struct {
+ blobLimit int
+ objectType ObjectType
+}
+
+// RevlistOption is an option for the revlist pipeline step.
+type RevlistOption func(cfg *revlistConfig)
+
+// WithBlobLimit sets up a size limit for blobs. Only blobs whose size is smaller than this limit
+// will be returned by the pipeline step.
+func WithBlobLimit(limit int) RevlistOption {
+ return func(cfg *revlistConfig) {
+ cfg.blobLimit = limit
+ }
+}
+
+// WithObjectTypeFilter will set up a `--filter=object:type=` filter for git-rev-list(1). This will
+// cause it to filter out any objects which do not match the given type. Because git-rev-list(1) by
+// default never filters provided arguments, this option also sets up the `--filter-provided` flag.
+// Note that this option is only supported starting with Git v2.32.0 or later.
+func WithObjectTypeFilter(t ObjectType) RevlistOption {
+ return func(cfg *revlistConfig) {
+ cfg.objectType = t
+ }
+}
+
+// Revlist runs git-rev-list(1) with objects and object names enabled. The returned channel will
+// contain all object IDs listed by this command. Cancelling the context will cause the pipeline to
+// be cancelled, too.
+func Revlist(
+ ctx context.Context,
+ repo *localrepo.Repo,
+ revisions []string,
+ options ...RevlistOption,
+) RevlistIterator {
+ var cfg revlistConfig
+ for _, option := range options {
+ option(&cfg)
+ }
+
+ resultChan := make(chan RevlistResult)
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result RevlistResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ flags := []git.Option{
+ git.Flag{Name: "--in-commit-order"},
+ git.Flag{Name: "--objects"},
+ git.Flag{Name: "--object-names"},
+ }
+ if cfg.blobLimit > 0 {
+ flags = append(flags, git.Flag{
+ Name: fmt.Sprintf("--filter=blob:limit=%d", cfg.blobLimit),
+ })
+ }
+ if cfg.objectType != "" {
+ flags = append(flags,
+ git.Flag{Name: fmt.Sprintf("--filter=object:type=%s", cfg.objectType)},
+ git.Flag{Name: "--filter-provided-objects"},
+ )
+ }
+
+ revlist, err := repo.Exec(ctx, git.SubCmd{
+ Name: "rev-list",
+ Flags: flags,
+ Args: revisions,
+ })
+ if err != nil {
+ sendResult(RevlistResult{err: err})
+ return
+ }
+
+ scanner := bufio.NewScanner(revlist)
+ for scanner.Scan() {
+ // We need to copy the line here because we'll hand it over to the caller
+ // asynchronously, and the next call to `Scan()` will overwrite the buffer.
+ line := make([]byte, len(scanner.Bytes()))
+ copy(line, scanner.Bytes())
+
+ oidAndName := bytes.SplitN(line, []byte{' '}, 2)
+
+ result := RevlistResult{
+ OID: git.ObjectID(oidAndName[0]),
+ }
+ if len(oidAndName) == 2 && len(oidAndName[1]) > 0 {
+ result.ObjectName = oidAndName[1]
+ }
+
+ if isDone := sendResult(result); isDone {
+ return
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ sendResult(RevlistResult{
+ err: fmt.Errorf("scanning rev-list output: %w", err),
+ })
+ return
+ }
+
+ if err := revlist.Wait(); err != nil {
+ sendResult(RevlistResult{
+ err: fmt.Errorf("rev-list pipeline command: %w", err),
+ })
+ return
+ }
+ }()
+
+ return &revlistIterator{
+ ch: resultChan,
+ }
+}
+
+// RevlistFilter filters the RevlistResults from the provided iterator with the filter function: if
+// the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors
+// cannot be filtered and will always be passed through.
+func RevlistFilter(ctx context.Context, it RevlistIterator, filter func(RevlistResult) bool) RevlistIterator {
+ resultChan := make(chan RevlistResult)
+
+ go func() {
+ defer close(resultChan)
+
+ for it.Next() {
+ result := it.Result()
+ if filter(result) {
+ select {
+ case resultChan <- result:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ select {
+ case resultChan <- RevlistResult{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return &revlistIterator{
+ ch: resultChan,
+ }
+}
diff --git a/internal/git/gitpipe/revlist_iterator.go b/internal/git/gitpipe/revlist_iterator.go
new file mode 100644
index 000000000..a6e2b86fd
--- /dev/null
+++ b/internal/git/gitpipe/revlist_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// RevlistIterator is an iterator returned by the Revlist function.
+type RevlistIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() RevlistResult
+}
+
+// NewRevlistIterator returns a new RevlistIterator for the given items.
+func NewRevlistIterator(items []RevlistResult) RevlistIterator {
+ itemChan := make(chan RevlistResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &revlistIterator{
+ ch: itemChan,
+ }
+}
+
+type revlistIterator struct {
+ ch <-chan RevlistResult
+ result RevlistResult
+}
+
+func (it *revlistIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *revlistIterator) Err() error {
+ return it.result.err
+}
+
+func (it *revlistIterator) Result() RevlistResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/revlist_test.go b/internal/git/gitpipe/revlist_test.go
new file mode 100644
index 000000000..d808927ea
--- /dev/null
+++ b/internal/git/gitpipe/revlist_test.go
@@ -0,0 +1,346 @@
+package gitpipe
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+)
+
+func TestRevlist(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ needsObjectTypeFilters := func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ gitVersion, err := git.CurrentVersion(ctx, git.NewExecCommandFactory(cfg))
+ require.NoError(t, err)
+
+ if !gitVersion.SupportsObjectTypeFilter() {
+ t.Skip("Git does not support object type filters")
+ }
+ }
+
+ for _, tc := range []struct {
+ desc string
+ precondition func(t *testing.T)
+ revisions []string
+ options []RevlistOption
+ expectedResults []RevlistResult
+ expectedErr error
+ }{
+ {
+ desc: "single blob",
+ revisions: []string{
+ lfsPointer1,
+ },
+ expectedResults: []RevlistResult{
+ {OID: lfsPointer1},
+ },
+ },
+ {
+ desc: "multiple blobs",
+ revisions: []string{
+ lfsPointer1,
+ lfsPointer2,
+ lfsPointer3,
+ lfsPointer4,
+ },
+ expectedResults: []RevlistResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer2},
+ {OID: lfsPointer3},
+ {OID: lfsPointer4},
+ },
+ },
+ {
+ desc: "duplicated blob prints blob once only",
+ revisions: []string{
+ lfsPointer1,
+ lfsPointer1,
+ },
+ expectedResults: []RevlistResult{
+ {OID: lfsPointer1},
+ },
+ },
+ {
+ desc: "tree results in object names",
+ revisions: []string{
+ "b95c0fad32f4361845f91d9ce4c1721b52b82793",
+ },
+ expectedResults: []RevlistResult{
+ {OID: "b95c0fad32f4361845f91d9ce4c1721b52b82793"},
+ {OID: "93e123ac8a3e6a0b600953d7598af629dec7b735", ObjectName: []byte("branch-test.txt")},
+ },
+ },
+ {
+ desc: "revision range",
+ revisions: []string{
+ "^refs/heads/master~",
+ "refs/heads/master",
+ },
+ expectedResults: []RevlistResult{
+ {OID: "1e292f8fedd741b75372e19097c76d327140c312"},
+ {OID: "07f8147e8e73aab6c935c296e8cdc5194dee729b"},
+ {OID: "ceb102b8d3f9a95c2eb979213e49f7cc1b23d56e", ObjectName: []byte("files")},
+ {OID: "2132d150328bd9334cc4e62a16a5d998a7e399b9", ObjectName: []byte("files/flat")},
+ {OID: "f3942dc8b824a2c9359e518d48e68f84461bd2f7", ObjectName: []byte("files/flat/path")},
+ {OID: "ea7249055466085d0a6c69951908ef47757e92f4", ObjectName: []byte("files/flat/path/correct")},
+ {OID: "c1c67abbaf91f624347bb3ae96eabe3a1b742478"},
+ },
+ },
+ {
+ // This is a tree object with multiple blobs. We cannot directly filter
+ // blobs given that Git will always print whatever's been provided on the
+ // command line. While we can already fix this with Git v2.32.0 via
+ // the new `--filter-provided` option, let's defer this fix to a later
+ // point. We demonstrate that this option is working by having the same test
+ // twice, once without and once with limit.
+ desc: "tree with multiple blobs without limit",
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ expectedResults: []RevlistResult{
+ {OID: "79d5f98270ad677c86a7e1ab2baa922958565135"},
+ {OID: "8af7f880ce38649fc49f66e3f38857bfbec3f0b7", ObjectName: []byte("feature-1.txt")},
+ {OID: "16ca0b267f82cd2f5ca1157dd162dae98745eab8", ObjectName: []byte("feature-2.txt")},
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ {OID: "570f8e1dfe8149c1d17002712310d43dfeb43159", ObjectName: []byte("russian.rb")},
+ {OID: "7a17968582c21c9153ec24c6a9d5f33592ad9103", ObjectName: []byte("test.txt")},
+ {OID: "f3064a3aa9c14277483f690250072e987e2c8356", ObjectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.txt")},
+ {OID: "3a26c18b02e843b459732e7ade7ab9a154a1002b", ObjectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.xls")},
+ },
+ },
+ {
+ // And now the second time we execute this test with a limit and see that we
+ // get less blobs as result.
+ desc: "tree with multiple blobs with limit",
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithBlobLimit(10),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "79d5f98270ad677c86a7e1ab2baa922958565135"},
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ },
+ },
+ {
+ desc: "tree with blob object type filter",
+ precondition: needsObjectTypeFilters,
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithObjectTypeFilter(ObjectTypeBlob),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "8af7f880ce38649fc49f66e3f38857bfbec3f0b7", ObjectName: []byte("feature-1.txt")},
+ {OID: "16ca0b267f82cd2f5ca1157dd162dae98745eab8", ObjectName: []byte("feature-2.txt")},
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ {OID: "570f8e1dfe8149c1d17002712310d43dfeb43159", ObjectName: []byte("russian.rb")},
+ {OID: "7a17968582c21c9153ec24c6a9d5f33592ad9103", ObjectName: []byte("test.txt")},
+ {OID: "f3064a3aa9c14277483f690250072e987e2c8356", ObjectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.txt")},
+ {OID: "3a26c18b02e843b459732e7ade7ab9a154a1002b", ObjectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.xls")},
+ },
+ },
+ {
+ desc: "tree with tag object type filter",
+ precondition: needsObjectTypeFilters,
+ revisions: []string{
+ "--all",
+ },
+ options: []RevlistOption{
+ WithObjectTypeFilter(ObjectTypeTag),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8", ObjectName: []byte("v1.0.0")},
+ {OID: "8a2a6eb295bb170b34c24c76c49ed0e9b2eaf34b", ObjectName: []byte("v1.1.0")},
+ {OID: "8f03acbcd11c53d9c9468078f32a2622005a4841", ObjectName: []byte("v1.1.1")},
+ },
+ },
+ {
+ desc: "tree with commit object type filter",
+ precondition: needsObjectTypeFilters,
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithObjectTypeFilter(ObjectTypeTree),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "79d5f98270ad677c86a7e1ab2baa922958565135"},
+ },
+ },
+ {
+ desc: "tree with commit object type filter",
+ precondition: needsObjectTypeFilters,
+ revisions: []string{
+ "^refs/heads/master~",
+ "refs/heads/master",
+ },
+ options: []RevlistOption{
+ WithObjectTypeFilter(ObjectTypeCommit),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "1e292f8fedd741b75372e19097c76d327140c312"},
+ {OID: "c1c67abbaf91f624347bb3ae96eabe3a1b742478"},
+ },
+ },
+ {
+ desc: "tree with object type and blob size filter",
+ precondition: needsObjectTypeFilters,
+ revisions: []string{
+ "79d5f98270ad677c86a7e1ab2baa922958565135",
+ },
+ options: []RevlistOption{
+ WithBlobLimit(10),
+ WithObjectTypeFilter(ObjectTypeBlob),
+ },
+ expectedResults: []RevlistResult{
+ {OID: "0fb47f093f769008049a0b0976ac3fa6d6125033", ObjectName: []byte("hotfix-1.txt")},
+ {OID: "4ae6c5e14452a35d04156277ae63e8356eb17cae", ObjectName: []byte("hotfix-2.txt")},
+ {OID: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", ObjectName: []byte("iso8859.txt")},
+ },
+ },
+ {
+ desc: "invalid revision",
+ revisions: []string{
+ "refs/heads/does-not-exist",
+ },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
+ },
+ {
+ desc: "mixed valid and invalid revision",
+ revisions: []string{
+ lfsPointer1,
+ "refs/heads/does-not-exist",
+ },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ if tc.precondition != nil {
+ tc.precondition(t)
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ it := Revlist(ctx, repo, tc.revisions, tc.options...)
+
+ var results []RevlistResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+
+ // We're converting the error here to a plain un-nested error such that we
+ // don't have to replicate the complete error's structure.
+ err := it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+}
+
+func TestRevlistFilter(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ input []RevlistResult
+ filter func(RevlistResult) bool
+ expectedResults []RevlistResult
+ expectedErr error
+ }{
+ {
+ desc: "all accepted",
+ input: []RevlistResult{
+ {OID: "a"},
+ {OID: "b"},
+ {OID: "c"},
+ },
+ filter: func(RevlistResult) bool {
+ return true
+ },
+ expectedResults: []RevlistResult{
+ {OID: "a"},
+ {OID: "b"},
+ {OID: "c"},
+ },
+ },
+ {
+ desc: "all filtered",
+ input: []RevlistResult{
+ {OID: "a"},
+ {OID: "b"},
+ {OID: "c"},
+ },
+ filter: func(RevlistResult) bool {
+ return false
+ },
+ expectedResults: nil,
+ },
+ {
+ desc: "errors always get through",
+ input: []RevlistResult{
+ {OID: "a"},
+ {OID: "b"},
+ {err: errors.New("foobar")},
+ {OID: "c"},
+ },
+ filter: func(RevlistResult) bool {
+ return false
+ },
+ expectedErr: errors.New("foobar"),
+ },
+ {
+ desc: "subset filtered",
+ input: []RevlistResult{
+ {OID: "a"},
+ {OID: "b"},
+ {OID: "c"},
+ },
+ filter: func(r RevlistResult) bool {
+ return r.OID == "b"
+ },
+ expectedResults: []RevlistResult{
+ {OID: "b"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ it := RevlistFilter(ctx, NewRevlistIterator(tc.input), tc.filter)
+
+ var results []RevlistResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+
+ require.Equal(t, tc.expectedErr, it.Err())
+ require.Equal(t, tc.expectedResults, results)
+ })
+ }
+}
diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go
new file mode 100644
index 000000000..b3683135f
--- /dev/null
+++ b/internal/git/gitpipe/testhelper_test.go
@@ -0,0 +1,21 @@
+package gitpipe
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+ defer testhelper.MustHaveNoChildProcess()
+
+ cleanup := testhelper.Configure()
+ defer cleanup()
+
+ return m.Run()
+}
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index ce6e1dca9..78c447079 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -9,6 +9,7 @@ import (
"github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -58,29 +59,27 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
return helper.ErrInternalf("cannot determine Git version: %v", err)
}
- var revlistOptions []revlistOption
+ var revlistOptions []gitpipe.RevlistOption
if gitVersion.SupportsObjectTypeFilter() {
- revlistOptions = append(revlistOptions, withObjectTypeFilter(objectTypeBlob))
+ revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob))
}
- revlistChan := revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
- catfileInfoChan := catfileInfo(ctx, catfileProcess, revlistChan)
- catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
- return r.objectInfo.Type == "blob"
+ revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob"
})
// If we have a zero bytes limit, then the caller didn't request any blob contents at all.
// We can thus skip reading blob contents completely.
if req.GetBytesLimit() == 0 {
var i uint32
- for blob := range catfileInfoChan {
- if blob.err != nil {
- return helper.ErrInternal(blob.err)
- }
+ for catfileInfoIter.Next() {
+ blob := catfileInfoIter.Result()
if err := chunker.Send(&gitalypb.ListBlobsResponse_Blob{
- Oid: blob.objectInfo.Oid.String(),
- Size: blob.objectInfo.Size,
+ Oid: blob.ObjectInfo.Oid.String(),
+ Size: blob.ObjectInfo.Size,
}); err != nil {
return helper.ErrInternal(fmt.Errorf("sending blob chunk: %w", err))
}
@@ -90,14 +89,16 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
break
}
}
+
+ if err := catfileInfoIter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
} else {
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
var i uint32
- for blob := range catfileObjectChan {
- if blob.err != nil {
- return helper.ErrInternal(blob.err)
- }
+ for catfileObjectIter.Next() {
+ blob := catfileObjectIter.Result()
headerSent := false
dataChunker := streamio.NewWriter(func(p []byte) error {
@@ -106,8 +107,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
}
if !headerSent {
- message.Oid = blob.objectInfo.Oid.String()
- message.Size = blob.objectInfo.Size
+ message.Oid = blob.ObjectInfo.Oid.String()
+ message.Size = blob.ObjectInfo.Size
headerSent = true
}
@@ -120,17 +121,17 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
readLimit := req.GetBytesLimit()
if readLimit < 0 {
- readLimit = blob.objectInfo.Size
+ readLimit = blob.ObjectInfo.Size
}
- _, err := io.CopyN(dataChunker, blob.objectReader, readLimit)
+ _, err := io.CopyN(dataChunker, blob.ObjectReader, readLimit)
if err != nil && !errors.Is(err, io.EOF) {
return helper.ErrInternal(fmt.Errorf("sending blob data: %w", err))
}
// Discard trailing blob data in case the blob is bigger than the read
// limit.
- _, err = io.Copy(ioutil.Discard, blob.objectReader)
+ _, err = io.Copy(ioutil.Discard, blob.ObjectReader)
if err != nil {
return helper.ErrInternal(fmt.Errorf("discarding blob data: %w", err))
}
@@ -140,8 +141,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
// header manually in that case.
if !headerSent {
if err := chunker.Send(&gitalypb.ListBlobsResponse_Blob{
- Oid: blob.objectInfo.Oid.String(),
- Size: blob.objectInfo.Size,
+ Oid: blob.ObjectInfo.Oid.String(),
+ Size: blob.ObjectInfo.Size,
}); err != nil {
return helper.ErrInternal(fmt.Errorf("sending blob chunk: %w", err))
}
@@ -152,6 +153,10 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
break
}
}
+
+ if err := catfileObjectIter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
}
if err := chunker.Flush(); err != nil {
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 200377d77..46442f546 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -13,6 +13,7 @@ import (
gitaly_errors "gitlab.com/gitlab-org/gitaly/v14/internal/errors"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gitpipe"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/chunk"
@@ -78,19 +79,19 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
return helper.ErrInternalf("cannot determine Git version: %v", err)
}
- revlistOptions := []revlistOption{withBlobLimit(lfsPointerMaxSize)}
+ revlistOptions := []gitpipe.RevlistOption{gitpipe.WithBlobLimit(lfsPointerMaxSize)}
if gitVersion.SupportsObjectTypeFilter() {
- revlistOptions = append(revlistOptions, withObjectTypeFilter(objectTypeBlob))
+ revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob))
}
- revlistChan := revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
- catfileInfoChan := catfileInfo(ctx, catfileProcess, revlistChan)
- catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
- return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
+ revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
}
}
@@ -146,13 +147,13 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
}
- catfileInfoChan := catfileInfoAllObjects(ctx, repo)
- catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
- return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
}
}
@@ -194,19 +195,18 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
}
- objectChan := make(chan revlistResult, len(req.GetBlobIds()))
- for _, blobID := range req.GetBlobIds() {
- objectChan <- revlistResult{oid: git.ObjectID(blobID)}
+ blobs := make([]gitpipe.RevlistResult, len(req.GetBlobIds()))
+ for i, blobID := range req.GetBlobIds() {
+ blobs[i] = gitpipe.RevlistResult{OID: git.ObjectID(blobID)}
}
- close(objectChan)
- catfileInfoChan := catfileInfo(ctx, catfileProcess, objectChan)
- catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
- return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs))
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil {
return err
}
}
@@ -378,14 +378,12 @@ func (t *lfsPointerSender) Send() error {
return t.send(t.pointers)
}
-func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectResult, limit int) error {
+func sendLFSPointers(chunker *chunk.Chunker, iter gitpipe.CatfileObjectIterator, limit int) error {
buffer := bytes.NewBuffer(make([]byte, 0, lfsPointerMaxSize))
var i int
- for lfsPointer := range lfsPointers {
- if lfsPointer.err != nil {
- return helper.ErrInternal(lfsPointer.err)
- }
+ for iter.Next() {
+ lfsPointer := iter.Result()
// Avoid allocating bytes for an LFS pointer until we know the current blob really
// is an LFS pointer.
@@ -394,7 +392,7 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectRes
// Given that we filter pipeline objects by size, the biggest object we may see here
// is 200 bytes in size. So it's not much of a problem to read this into memory
// completely.
- if _, err := io.Copy(buffer, lfsPointer.objectReader); err != nil {
+ if _, err := io.Copy(buffer, lfsPointer.ObjectReader); err != nil {
return helper.ErrInternal(fmt.Errorf("reading LFS pointer data: %w", err))
}
@@ -408,7 +406,7 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectRes
if err := chunker.Send(&gitalypb.LFSPointer{
Data: objectData,
Size: int64(len(objectData)),
- Oid: lfsPointer.objectInfo.Oid.String(),
+ Oid: lfsPointer.ObjectInfo.Oid.String(),
}); err != nil {
return helper.ErrInternal(fmt.Errorf("sending LFS pointer chunk: %w", err))
}
@@ -419,6 +417,10 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectRes
}
}
+ if err := iter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
+
if err := chunker.Flush(); err != nil {
return helper.ErrInternal(err)
}
diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go
deleted file mode 100644
index 52500b401..000000000
--- a/internal/gitaly/service/blob/pipeline.go
+++ /dev/null
@@ -1,439 +0,0 @@
-package blob
-
-import (
- "bufio"
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "sync"
-
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
-)
-
-// revlistResult is a result for the revlist pipeline step.
-type revlistResult struct {
- // err is an error which occurred during execution of the pipeline.
- err error
-
- // oid is the object ID of an object printed by git-rev-list(1).
- oid git.ObjectID
- // objectName is the name of the object. This is typically the path of the object if it was
- // traversed via either a tree or a commit. The path depends on the order in which objects
- // are traversed: if e.g. two different trees refer to the same blob with different names,
- // the blob's path depends on which of the trees was traversed first.
- objectName []byte
-}
-
-type objectType string
-
-const (
- objectTypeCommit = objectType("commit")
- objectTypeBlob = objectType("blob")
- objectTypeTree = objectType("tree")
- objectTypeTag = objectType("tag")
-)
-
-// revlistConfig is configuration for the revlist pipeline step.
-type revlistConfig struct {
- blobLimit int
- objectType objectType
-}
-
-// revlistOption is an option for the revlist pipeline step.
-type revlistOption func(cfg *revlistConfig)
-
-// withBlobLimit sets up a size limit for blobs. Only blobs whose size is smaller than this limit
-// will be returned by the pipeline step.
-func withBlobLimit(limit int) revlistOption {
- return func(cfg *revlistConfig) {
- cfg.blobLimit = limit
- }
-}
-
-// withObjectTypeFilter will set up a `--filter=object:type=` filter for git-rev-list(1). This will
-// cause it to filter out any objects which do not match the given type. Because git-rev-list(1) by
-// default never filters provided arguments, this option also sets up the `--filter-provided` flag.
-// Note that this option is only supported starting with Git v2.32.0 or later.
-func withObjectTypeFilter(t objectType) revlistOption {
- return func(cfg *revlistConfig) {
- cfg.objectType = t
- }
-}
-
-// revlist runs git-rev-list(1) with objects and object names enabled. The returned channel will
-// contain all object IDs listed by this command. Cancelling the context will cause the pipeline to
-// be cancelled, too.
-func revlist(
- ctx context.Context,
- repo *localrepo.Repo,
- revisions []string,
- options ...revlistOption,
-) <-chan revlistResult {
- var cfg revlistConfig
- for _, option := range options {
- option(&cfg)
- }
-
- resultChan := make(chan revlistResult)
- go func() {
- defer close(resultChan)
-
- sendResult := func(result revlistResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
- flags := []git.Option{
- git.Flag{Name: "--in-commit-order"},
- git.Flag{Name: "--objects"},
- git.Flag{Name: "--object-names"},
- }
- if cfg.blobLimit > 0 {
- flags = append(flags, git.Flag{
- Name: fmt.Sprintf("--filter=blob:limit=%d", cfg.blobLimit),
- })
- }
- if cfg.objectType != "" {
- flags = append(flags,
- git.Flag{Name: fmt.Sprintf("--filter=object:type=%s", cfg.objectType)},
- git.Flag{Name: "--filter-provided-objects"},
- )
- }
-
- revlist, err := repo.Exec(ctx, git.SubCmd{
- Name: "rev-list",
- Flags: flags,
- Args: revisions,
- })
- if err != nil {
- sendResult(revlistResult{err: err})
- return
- }
-
- scanner := bufio.NewScanner(revlist)
- for scanner.Scan() {
- // We need to copy the line here because we'll hand it over to the caller
- // asynchronously, and the next call to `Scan()` will overwrite the buffer.
- line := make([]byte, len(scanner.Bytes()))
- copy(line, scanner.Bytes())
-
- oidAndName := bytes.SplitN(line, []byte{' '}, 2)
-
- result := revlistResult{
- oid: git.ObjectID(oidAndName[0]),
- }
- if len(oidAndName) == 2 && len(oidAndName[1]) > 0 {
- result.objectName = oidAndName[1]
- }
-
- if isDone := sendResult(result); isDone {
- return
- }
- }
-
- if err := scanner.Err(); err != nil {
- sendResult(revlistResult{
- err: fmt.Errorf("scanning rev-list output: %w", err),
- })
- return
- }
-
- if err := revlist.Wait(); err != nil {
- sendResult(revlistResult{
- err: fmt.Errorf("rev-list pipeline command: %w", err),
- })
- return
- }
- }()
-
- return resultChan
-}
-
-// revlistFilter filters the revlistResults from the provided channel with the filter function: if
-// the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors
-// cannot be filtered and will always be passed through.
-func revlistFilter(ctx context.Context, c <-chan revlistResult, filter func(revlistResult) bool) <-chan revlistResult {
- resultChan := make(chan revlistResult)
- go func() {
- defer close(resultChan)
-
- for result := range c {
- if result.err != nil || filter(result) {
- select {
- case resultChan <- result:
- case <-ctx.Done():
- return
- }
- }
- }
- }()
- return resultChan
-}
-
-// catfileInfoResult is a result for the catfileInfo pipeline step.
-type catfileInfoResult struct {
- // err is an error which occurred during execution of the pipeline.
- err error
-
- // objectName is the object name as received from the revlistResultChan.
- objectName []byte
- // objectInfo is the object info of the object.
- objectInfo *catfile.ObjectInfo
-}
-
-// catfileInfo processes revlistResults from the given channel and extracts object information via
-// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
-// results. Any error received via the channel or encountered in this step will cause the pipeline
-// to fail. Context cancellation will gracefully halt the pipeline.
-func catfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <-chan revlistResult) <-chan catfileInfoResult {
- resultChan := make(chan catfileInfoResult)
-
- go func() {
- defer close(resultChan)
-
- sendResult := func(result catfileInfoResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
- for revlistResult := range revlistResultChan {
- if revlistResult.err != nil {
- sendResult(catfileInfoResult{err: revlistResult.err})
- return
- }
-
- objectInfo, err := catfile.Info(ctx, revlistResult.oid.Revision())
- if err != nil {
- sendResult(catfileInfoResult{
- err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.oid, err),
- })
- return
- }
-
- if isDone := sendResult(catfileInfoResult{
- objectName: revlistResult.objectName,
- objectInfo: objectInfo,
- }); isDone {
- return
- }
- }
- }()
-
- return resultChan
-}
-
-// catfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
-// extracts their object info via `git cat-file --batch-check`. The returned channel will contain
-// all processed results. Any error encountered during execution of this pipeline step will cause
-// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
-// pipeline step, the resulting catfileInfoResults will never have an object name.
-func catfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan catfileInfoResult {
- resultChan := make(chan catfileInfoResult)
-
- go func() {
- defer close(resultChan)
-
- sendResult := func(result catfileInfoResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
- cmd, err := repo.Exec(ctx, git.SubCmd{
- Name: "cat-file",
- Flags: []git.Option{
- git.Flag{Name: "--batch-all-objects"},
- git.Flag{Name: "--batch-check"},
- git.Flag{Name: "--buffer"},
- git.Flag{Name: "--unordered"},
- },
- })
- if err != nil {
- sendResult(catfileInfoResult{
- err: fmt.Errorf("spawning cat-file failed: %w", err),
- })
- return
- }
-
- reader := bufio.NewReader(cmd)
- for {
- objectInfo, err := catfile.ParseObjectInfo(reader)
- if err != nil {
- if errors.Is(err, io.EOF) {
- break
- }
-
- sendResult(catfileInfoResult{
- err: fmt.Errorf("parsing object info: %w", err),
- })
- return
- }
-
- if isDone := sendResult(catfileInfoResult{
- objectInfo: objectInfo,
- }); isDone {
- return
- }
- }
-
- if err := cmd.Wait(); err != nil {
- sendResult(catfileInfoResult{
- err: fmt.Errorf("cat-file failed: %w", err),
- })
- return
- }
- }()
-
- return resultChan
-}
-
-// catfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
-// function: if the filter returns `false` for a given item, then it will be dropped from the
-// pipeline. Errors cannot be filtered and will always be passed through.
-func catfileInfoFilter(ctx context.Context, c <-chan catfileInfoResult, filter func(catfileInfoResult) bool) <-chan catfileInfoResult {
- resultChan := make(chan catfileInfoResult)
- go func() {
- defer close(resultChan)
-
- for result := range c {
- if result.err != nil || filter(result) {
- select {
- case resultChan <- result:
- case <-ctx.Done():
- return
- }
- }
- }
- }()
- return resultChan
-}
-
-// catfileObjectResult is a result for the catfileObject pipeline step.
-type catfileObjectResult struct {
- // err is an error which occurred during execution of the pipeline.
- err error
-
- // objectName is the object name as received from the revlistResultChan.
- objectName []byte
- // objectInfo is the object info of the object.
- objectInfo *catfile.ObjectInfo
- // obbjectReader is the reader for the raw object data. The reader must always be consumed
- // by the caller.
- objectReader io.Reader
-}
-
-type signallingReader struct {
- reader io.Reader
- doneCh chan interface{}
- closeOnce sync.Once
-}
-
-func (r *signallingReader) Read(p []byte) (int, error) {
- n, err := r.reader.Read(p)
- if errors.Is(err, io.EOF) {
- r.closeOnce.Do(func() {
- close(r.doneCh)
- })
- }
- return n, err
-}
-
-// catfileObject processes catfileInfoResults from the given channel and reads associated objects
-// into memory via `git cat-file --batch`. The returned channel will contain all processed objects.
-// Any error received via the channel or encountered in this step will cause the pipeline to fail.
-// Context cancellation will gracefully halt the pipeline. The returned object readers must always
-// be fully consumed by the caller.
-func catfileObject(
- ctx context.Context,
- catfileProcess catfile.Batch,
- catfileInfoResultChan <-chan catfileInfoResult,
-) <-chan catfileObjectResult {
- resultChan := make(chan catfileObjectResult)
- go func() {
- defer close(resultChan)
-
- sendResult := func(result catfileObjectResult) bool {
- select {
- case resultChan <- result:
- return false
- case <-ctx.Done():
- return true
- }
- }
-
- var objectReader *signallingReader
-
- for catfileInfoResult := range catfileInfoResultChan {
- if catfileInfoResult.err != nil {
- sendResult(catfileObjectResult{err: catfileInfoResult.err})
- return
- }
-
- // We mustn't try to read another object before reading the previous object
- // has concluded. Given that this is not under our control but under the
- // control of the caller, we thus have to wait until the blocking reader has
- // reached EOF.
- if objectReader != nil {
- select {
- case <-objectReader.doneCh:
- case <-ctx.Done():
- return
- }
- }
-
- var object *catfile.Object
- var err error
-
- objectType := catfileInfoResult.objectInfo.Type
- switch objectType {
- case "tag":
- object, err = catfileProcess.Tag(ctx, catfileInfoResult.objectInfo.Oid.Revision())
- case "commit":
- object, err = catfileProcess.Commit(ctx, catfileInfoResult.objectInfo.Oid.Revision())
- case "tree":
- object, err = catfileProcess.Tree(ctx, catfileInfoResult.objectInfo.Oid.Revision())
- case "blob":
- object, err = catfileProcess.Blob(ctx, catfileInfoResult.objectInfo.Oid.Revision())
- default:
- err = fmt.Errorf("unknown object type %q", objectType)
- }
-
- if err != nil {
- sendResult(catfileObjectResult{
- err: fmt.Errorf("requesting object: %w", err),
- })
- return
- }
-
- objectReader = &signallingReader{
- reader: object,
- doneCh: make(chan interface{}),
- }
-
- if isDone := sendResult(catfileObjectResult{
- objectName: catfileInfoResult.objectName,
- objectInfo: catfileInfoResult.objectInfo,
- objectReader: objectReader,
- }); isDone {
- return
- }
- }
- }()
-
- return resultChan
-}
diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go
deleted file mode 100644
index 96c071d13..000000000
--- a/internal/gitaly/service/blob/pipeline_test.go
+++ /dev/null
@@ -1,977 +0,0 @@
-package blob
-
-import (
- "context"
- "errors"
- "io"
- "io/ioutil"
- "sync"
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
-)
-
-func TestRevlist(t *testing.T) {
- cfg := testcfg.Build(t)
-
- repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
- defer cleanup()
- repo := localrepo.NewTestRepo(t, cfg, repoProto)
-
- needsObjectTypeFilters := func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- gitVersion, err := git.CurrentVersion(ctx, git.NewExecCommandFactory(cfg))
- require.NoError(t, err)
-
- if !gitVersion.SupportsObjectTypeFilter() {
- t.Skip("Git does not support object type filters")
- }
- }
-
- for _, tc := range []struct {
- desc string
- precondition func(t *testing.T)
- revisions []string
- options []revlistOption
- expectedResults []revlistResult
- }{
- {
- desc: "single blob",
- revisions: []string{
- lfsPointer1,
- },
- expectedResults: []revlistResult{
- {oid: lfsPointer1},
- },
- },
- {
- desc: "multiple blobs",
- revisions: []string{
- lfsPointer1,
- lfsPointer2,
- lfsPointer3,
- lfsPointer4,
- },
- expectedResults: []revlistResult{
- {oid: lfsPointer1},
- {oid: lfsPointer2},
- {oid: lfsPointer3},
- {oid: lfsPointer4},
- },
- },
- {
- desc: "duplicated blob prints blob once only",
- revisions: []string{
- lfsPointer1,
- lfsPointer1,
- },
- expectedResults: []revlistResult{
- {oid: lfsPointer1},
- },
- },
- {
- desc: "tree results in object names",
- revisions: []string{
- "b95c0fad32f4361845f91d9ce4c1721b52b82793",
- },
- expectedResults: []revlistResult{
- {oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793"},
- {oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", objectName: []byte("branch-test.txt")},
- },
- },
- {
- desc: "revision range",
- revisions: []string{
- "^refs/heads/master~",
- "refs/heads/master",
- },
- expectedResults: []revlistResult{
- {oid: "1e292f8fedd741b75372e19097c76d327140c312"},
- {oid: "07f8147e8e73aab6c935c296e8cdc5194dee729b"},
- {oid: "ceb102b8d3f9a95c2eb979213e49f7cc1b23d56e", objectName: []byte("files")},
- {oid: "2132d150328bd9334cc4e62a16a5d998a7e399b9", objectName: []byte("files/flat")},
- {oid: "f3942dc8b824a2c9359e518d48e68f84461bd2f7", objectName: []byte("files/flat/path")},
- {oid: "ea7249055466085d0a6c69951908ef47757e92f4", objectName: []byte("files/flat/path/correct")},
- {oid: "c1c67abbaf91f624347bb3ae96eabe3a1b742478"},
- },
- },
- {
- // This is a tree object with multiple blobs. We cannot directly filter
- // blobs given that Git will always print whatever's been provided on the
- // command line. While we can already fix this with Git v2.32.0 via
- // the new `--filter-provided` option, let's defer this fix to a later
- // point. We demonstrate that this option is working by having the same test
- // twice, once without and once with limit.
- desc: "tree with multiple blobs without limit",
- revisions: []string{
- "79d5f98270ad677c86a7e1ab2baa922958565135",
- },
- expectedResults: []revlistResult{
- {oid: "79d5f98270ad677c86a7e1ab2baa922958565135"},
- {oid: "8af7f880ce38649fc49f66e3f38857bfbec3f0b7", objectName: []byte("feature-1.txt")},
- {oid: "16ca0b267f82cd2f5ca1157dd162dae98745eab8", objectName: []byte("feature-2.txt")},
- {oid: "0fb47f093f769008049a0b0976ac3fa6d6125033", objectName: []byte("hotfix-1.txt")},
- {oid: "4ae6c5e14452a35d04156277ae63e8356eb17cae", objectName: []byte("hotfix-2.txt")},
- {oid: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", objectName: []byte("iso8859.txt")},
- {oid: "570f8e1dfe8149c1d17002712310d43dfeb43159", objectName: []byte("russian.rb")},
- {oid: "7a17968582c21c9153ec24c6a9d5f33592ad9103", objectName: []byte("test.txt")},
- {oid: "f3064a3aa9c14277483f690250072e987e2c8356", objectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.txt")},
- {oid: "3a26c18b02e843b459732e7ade7ab9a154a1002b", objectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.xls")},
- },
- },
- {
- // And now the second time we execute this test with a limit and see that we
- // get less blobs as result.
- desc: "tree with multiple blobs with limit",
- revisions: []string{
- "79d5f98270ad677c86a7e1ab2baa922958565135",
- },
- options: []revlistOption{
- withBlobLimit(10),
- },
- expectedResults: []revlistResult{
- {oid: "79d5f98270ad677c86a7e1ab2baa922958565135"},
- {oid: "0fb47f093f769008049a0b0976ac3fa6d6125033", objectName: []byte("hotfix-1.txt")},
- {oid: "4ae6c5e14452a35d04156277ae63e8356eb17cae", objectName: []byte("hotfix-2.txt")},
- {oid: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", objectName: []byte("iso8859.txt")},
- },
- },
- {
- desc: "tree with blob object type filter",
- precondition: needsObjectTypeFilters,
- revisions: []string{
- "79d5f98270ad677c86a7e1ab2baa922958565135",
- },
- options: []revlistOption{
- withObjectTypeFilter(objectTypeBlob),
- },
- expectedResults: []revlistResult{
- {oid: "8af7f880ce38649fc49f66e3f38857bfbec3f0b7", objectName: []byte("feature-1.txt")},
- {oid: "16ca0b267f82cd2f5ca1157dd162dae98745eab8", objectName: []byte("feature-2.txt")},
- {oid: "0fb47f093f769008049a0b0976ac3fa6d6125033", objectName: []byte("hotfix-1.txt")},
- {oid: "4ae6c5e14452a35d04156277ae63e8356eb17cae", objectName: []byte("hotfix-2.txt")},
- {oid: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", objectName: []byte("iso8859.txt")},
- {oid: "570f8e1dfe8149c1d17002712310d43dfeb43159", objectName: []byte("russian.rb")},
- {oid: "7a17968582c21c9153ec24c6a9d5f33592ad9103", objectName: []byte("test.txt")},
- {oid: "f3064a3aa9c14277483f690250072e987e2c8356", objectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.txt")},
- {oid: "3a26c18b02e843b459732e7ade7ab9a154a1002b", objectName: []byte("\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88.xls")},
- },
- },
- {
- desc: "tree with tag object type filter",
- precondition: needsObjectTypeFilters,
- revisions: []string{
- "--all",
- },
- options: []revlistOption{
- withObjectTypeFilter(objectTypeTag),
- },
- expectedResults: []revlistResult{
- {oid: "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8", objectName: []byte("v1.0.0")},
- {oid: "8a2a6eb295bb170b34c24c76c49ed0e9b2eaf34b", objectName: []byte("v1.1.0")},
- {oid: "8f03acbcd11c53d9c9468078f32a2622005a4841", objectName: []byte("v1.1.1")},
- },
- },
- {
- desc: "tree with commit object type filter",
- precondition: needsObjectTypeFilters,
- revisions: []string{
- "79d5f98270ad677c86a7e1ab2baa922958565135",
- },
- options: []revlistOption{
- withObjectTypeFilter(objectTypeTree),
- },
- expectedResults: []revlistResult{
- {oid: "79d5f98270ad677c86a7e1ab2baa922958565135"},
- },
- },
- {
- desc: "tree with commit object type filter",
- precondition: needsObjectTypeFilters,
- revisions: []string{
- "^refs/heads/master~",
- "refs/heads/master",
- },
- options: []revlistOption{
- withObjectTypeFilter(objectTypeCommit),
- },
- expectedResults: []revlistResult{
- {oid: "1e292f8fedd741b75372e19097c76d327140c312"},
- {oid: "c1c67abbaf91f624347bb3ae96eabe3a1b742478"},
- },
- },
- {
- desc: "tree with object type and blob size filter",
- precondition: needsObjectTypeFilters,
- revisions: []string{
- "79d5f98270ad677c86a7e1ab2baa922958565135",
- },
- options: []revlistOption{
- withBlobLimit(10),
- withObjectTypeFilter(objectTypeBlob),
- },
- expectedResults: []revlistResult{
- {oid: "0fb47f093f769008049a0b0976ac3fa6d6125033", objectName: []byte("hotfix-1.txt")},
- {oid: "4ae6c5e14452a35d04156277ae63e8356eb17cae", objectName: []byte("hotfix-2.txt")},
- {oid: "b988ffed90cb6a9b7f98a3686a933edb3c5d70c0", objectName: []byte("iso8859.txt")},
- },
- },
- {
- desc: "invalid revision",
- revisions: []string{
- "refs/heads/does-not-exist",
- },
- expectedResults: []revlistResult{
- {err: errors.New("rev-list pipeline command: exit status 128")},
- },
- },
- {
- desc: "mixed valid and invalid revision",
- revisions: []string{
- lfsPointer1,
- "refs/heads/does-not-exist",
- },
- expectedResults: []revlistResult{
- {err: errors.New("rev-list pipeline command: exit status 128")},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- if tc.precondition != nil {
- tc.precondition(t)
- }
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- resultChan := revlist(ctx, repo, tc.revisions, tc.options...)
-
- var results []revlistResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.err != nil {
- result.err = errors.New(result.err.Error())
- }
-
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestRevlistFilter(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []revlistResult
- filter func(revlistResult) bool
- expectedResults []revlistResult
- }{
- {
- desc: "all accepted",
- input: []revlistResult{
- {oid: "a"},
- {oid: "b"},
- {oid: "c"},
- },
- filter: func(revlistResult) bool {
- return true
- },
- expectedResults: []revlistResult{
- {oid: "a"},
- {oid: "b"},
- {oid: "c"},
- },
- },
- {
- desc: "all filtered",
- input: []revlistResult{
- {oid: "a"},
- {oid: "b"},
- {oid: "c"},
- },
- filter: func(revlistResult) bool {
- return false
- },
- expectedResults: nil,
- },
- {
- desc: "errors always get through",
- input: []revlistResult{
- {oid: "a"},
- {oid: "b"},
- {err: errors.New("foobar")},
- {oid: "c"},
- },
- filter: func(revlistResult) bool {
- return false
- },
- expectedResults: []revlistResult{
- {err: errors.New("foobar")},
- },
- },
- {
- desc: "subset filtered",
- input: []revlistResult{
- {oid: "a"},
- {oid: "b"},
- {oid: "c"},
- },
- filter: func(r revlistResult) bool {
- return r.oid == "b"
- },
- expectedResults: []revlistResult{
- {oid: "b"},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- inputChan := make(chan revlistResult, len(tc.input))
- for _, input := range tc.input {
- inputChan <- input
- }
- close(inputChan)
-
- var results []revlistResult
- for result := range revlistFilter(ctx, inputChan, tc.filter) {
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestCatfileInfo(t *testing.T) {
- cfg := testcfg.Build(t)
-
- repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
- defer cleanup()
- repo := localrepo.NewTestRepo(t, cfg, repoProto)
-
- for _, tc := range []struct {
- desc string
- revlistInputs []revlistResult
- expectedResults []catfileInfoResult
- }{
- {
- desc: "single blob",
- revlistInputs: []revlistResult{
- {oid: lfsPointer1},
- },
- expectedResults: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- },
- },
- {
- desc: "multiple blobs",
- revlistInputs: []revlistResult{
- {oid: lfsPointer1},
- {oid: lfsPointer2},
- {oid: lfsPointer3},
- {oid: lfsPointer4},
- },
- expectedResults: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
- },
- },
- {
- desc: "object name",
- revlistInputs: []revlistResult{
- {oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793"},
- {oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", objectName: []byte("branch-test.txt")},
- },
- expectedResults: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
- {objectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, objectName: []byte("branch-test.txt")},
- },
- },
- {
- desc: "invalid object ID",
- revlistInputs: []revlistResult{
- {oid: "invalidobjectid"},
- },
- expectedResults: []catfileInfoResult{
- {err: errors.New("retrieving object info for \"invalidobjectid\": object not found")},
- },
- },
- {
- desc: "mixed valid and invalid revision",
- revlistInputs: []revlistResult{
- {oid: lfsPointer1},
- {oid: "invalidobjectid"},
- {oid: lfsPointer2},
- },
- expectedResults: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {err: errors.New("retrieving object info for \"invalidobjectid\": object not found")},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- catfileCache := catfile.NewCache(cfg)
- defer catfileCache.Stop()
-
- catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
- require.NoError(t, err)
-
- revlistResultChan := make(chan revlistResult, len(tc.revlistInputs))
- for _, input := range tc.revlistInputs {
- revlistResultChan <- input
- }
- close(revlistResultChan)
-
- resultChan := catfileInfo(ctx, catfileProcess, revlistResultChan)
-
- var results []catfileInfoResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.err != nil {
- result.err = errors.New(result.err.Error())
- }
-
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestCatfileAllObjects(t *testing.T) {
- cfg := testcfg.Build(t)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
- defer cleanup()
- repo := localrepo.NewTestRepo(t, cfg, repoProto)
-
- blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
- blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
- tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
- {Path: "foobar", Mode: "100644", OID: blob1},
- })
- commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
-
- resultChan := catfileInfoAllObjects(ctx, repo)
-
- var results []catfileInfoResult
- for result := range resultChan {
- require.NoError(t, result.err)
- results = append(results, result)
- }
-
- require.ElementsMatch(t, []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
- {objectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}},
- {objectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}},
- {objectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
- }, results)
-}
-
-func TestCatfileInfoFilter(t *testing.T) {
- for _, tc := range []struct {
- desc string
- input []catfileInfoResult
- filter func(catfileInfoResult) bool
- expectedResults []catfileInfoResult
- }{
- {
- desc: "all accepted",
- input: []catfileInfoResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileInfoResult) bool {
- return true
- },
- expectedResults: []catfileInfoResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- },
- {
- desc: "all filtered",
- input: []catfileInfoResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileInfoResult) bool {
- return false
- },
- },
- {
- desc: "errors always get through",
- input: []catfileInfoResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {err: errors.New("foobar")},
- {objectName: []byte{'c'}},
- },
- filter: func(catfileInfoResult) bool {
- return false
- },
- expectedResults: []catfileInfoResult{
- {err: errors.New("foobar")},
- },
- },
- {
- desc: "subset filtered",
- input: []catfileInfoResult{
- {objectName: []byte{'a'}},
- {objectName: []byte{'b'}},
- {objectName: []byte{'c'}},
- },
- filter: func(r catfileInfoResult) bool {
- return r.objectName[0] == 'b'
- },
- expectedResults: []catfileInfoResult{
- {objectName: []byte{'b'}},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- inputChan := make(chan catfileInfoResult, len(tc.input))
- for _, input := range tc.input {
- inputChan <- input
- }
- close(inputChan)
-
- var results []catfileInfoResult
- for result := range catfileInfoFilter(ctx, inputChan, tc.filter) {
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestCatfileObject(t *testing.T) {
- cfg := testcfg.Build(t)
-
- repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
- defer cleanup()
- repo := localrepo.NewTestRepo(t, cfg, repoProto)
-
- for _, tc := range []struct {
- desc string
- catfileInfoInputs []catfileInfoResult
- expectedResults []catfileObjectResult
- }{
- {
- desc: "single blob",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- },
- },
- {
- desc: "multiple blobs",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer4, Type: "blob", Size: 129}},
- },
- },
- {
- desc: "revlist result with object names",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
- {objectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, objectName: []byte("branch-test.txt")},
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
- {objectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, objectName: []byte("branch-test.txt")},
- },
- },
- {
- desc: "invalid object ID",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "invalidobjectid", Type: "blob"}},
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("requesting object: object not found")},
- },
- },
- {
- desc: "invalid object type",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}},
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("requesting object: unknown object type \"foobar\"")},
- },
- },
- {
- desc: "mixed valid and invalid revision",
- catfileInfoInputs: []catfileInfoResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2}},
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {err: errors.New("requesting object: unknown object type \"foobar\"")},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- catfileCache := catfile.NewCache(cfg)
- defer catfileCache.Stop()
-
- catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
- require.NoError(t, err)
-
- catfileInfoResultChan := make(chan catfileInfoResult, len(tc.catfileInfoInputs))
- for _, input := range tc.catfileInfoInputs {
- catfileInfoResultChan <- input
- }
- close(catfileInfoResultChan)
-
- resultChan := catfileObject(ctx, catfileProcess, catfileInfoResultChan)
-
- var results []catfileObjectResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.err != nil {
- result.err = errors.New(result.err.Error())
- }
-
- if result.err == nil {
- // While we could also assert object data, let's not do
- // this: it would just be too annoying.
- require.NotNil(t, result.objectReader)
-
- objectData, err := ioutil.ReadAll(result.objectReader)
- require.NoError(t, err)
- require.Len(t, objectData, int(result.objectInfo.Size))
-
- result.objectReader = nil
- }
-
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-}
-
-func TestPipeline(t *testing.T) {
- cfg := testcfg.Build(t)
-
- repoProto, _, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name())
- defer cleanup()
- repo := localrepo.NewTestRepo(t, cfg, repoProto)
-
- for _, tc := range []struct {
- desc string
- revisions []string
- revlistFilter func(revlistResult) bool
- catfileInfoFilter func(catfileInfoResult) bool
- expectedResults []catfileObjectResult
- }{
- {
- desc: "single blob",
- revisions: []string{
- lfsPointer1,
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- },
- },
- {
- desc: "multiple blobs",
- revisions: []string{
- lfsPointer1,
- lfsPointer2,
- lfsPointer3,
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer3, Type: "blob", Size: 127}},
- },
- },
- {
- desc: "multiple blobs with filter",
- revisions: []string{
- lfsPointer1,
- lfsPointer2,
- lfsPointer3,
- },
- revlistFilter: func(r revlistResult) bool {
- return r.oid == lfsPointer2
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
- },
- },
- {
- desc: "tree",
- revisions: []string{
- "b95c0fad32f4361845f91d9ce4c1721b52b82793",
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "b95c0fad32f4361845f91d9ce4c1721b52b82793", Type: "tree", Size: 43}},
- {objectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, objectName: []byte("branch-test.txt")},
- },
- },
- {
- desc: "tree with blob filter",
- revisions: []string{
- "b95c0fad32f4361845f91d9ce4c1721b52b82793",
- },
- catfileInfoFilter: func(r catfileInfoResult) bool {
- return r.objectInfo.Type == "blob"
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "93e123ac8a3e6a0b600953d7598af629dec7b735", Type: "blob", Size: 59}, objectName: []byte("branch-test.txt")},
- },
- },
- {
- desc: "revision range",
- revisions: []string{
- "^master~",
- "master",
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: "1e292f8fedd741b75372e19097c76d327140c312", Type: "commit", Size: 388}},
- {objectInfo: &catfile.ObjectInfo{Oid: "07f8147e8e73aab6c935c296e8cdc5194dee729b", Type: "tree", Size: 780}},
- {objectInfo: &catfile.ObjectInfo{Oid: "ceb102b8d3f9a95c2eb979213e49f7cc1b23d56e", Type: "tree", Size: 258}, objectName: []byte("files")},
- {objectInfo: &catfile.ObjectInfo{Oid: "2132d150328bd9334cc4e62a16a5d998a7e399b9", Type: "tree", Size: 31}, objectName: []byte("files/flat")},
- {objectInfo: &catfile.ObjectInfo{Oid: "f3942dc8b824a2c9359e518d48e68f84461bd2f7", Type: "tree", Size: 34}, objectName: []byte("files/flat/path")},
- {objectInfo: &catfile.ObjectInfo{Oid: "ea7249055466085d0a6c69951908ef47757e92f4", Type: "tree", Size: 39}, objectName: []byte("files/flat/path/correct")},
- {objectInfo: &catfile.ObjectInfo{Oid: "c1c67abbaf91f624347bb3ae96eabe3a1b742478", Type: "commit", Size: 326}},
- },
- },
- {
- desc: "--all with all filters",
- revisions: []string{
- "--all",
- },
- revlistFilter: func(r revlistResult) bool {
- // Let through two LFS pointers and a tree.
- return r.oid == "b95c0fad32f4361845f91d9ce4c1721b52b82793" ||
- r.oid == lfsPointer1 || r.oid == lfsPointer2
- },
- catfileInfoFilter: func(r catfileInfoResult) bool {
- // Only let through blobs, so only the two LFS pointers remain.
- return r.objectInfo.Type == "blob"
- },
- expectedResults: []catfileObjectResult{
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, objectName: []byte("files/lfs/lfs_object.iso")},
- {objectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}, objectName: []byte("another.lfs")},
- },
- },
- {
- desc: "invalid revision",
- revisions: []string{
- "doesnotexist",
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("rev-list pipeline command: exit status 128")},
- },
- },
- {
- desc: "mixed valid and invalid revision",
- revisions: []string{
- lfsPointer1,
- "doesnotexist",
- lfsPointer2,
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("rev-list pipeline command: exit status 128")},
- },
- },
- {
- desc: "invalid revision with all filters",
- revisions: []string{
- "doesnotexist",
- },
- revlistFilter: func(r revlistResult) bool {
- require.Fail(t, "filter should not be invoked on errors")
- return true
- },
- catfileInfoFilter: func(r catfileInfoResult) bool {
- require.Fail(t, "filter should not be invoked on errors")
- return true
- },
- expectedResults: []catfileObjectResult{
- {err: errors.New("rev-list pipeline command: exit status 128")},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- catfileCache := catfile.NewCache(cfg)
- defer catfileCache.Stop()
-
- catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
- require.NoError(t, err)
-
- revlistChan := revlist(ctx, repo, tc.revisions)
- if tc.revlistFilter != nil {
- revlistChan = revlistFilter(ctx, revlistChan, tc.revlistFilter)
- }
-
- catfileInfoChan := catfileInfo(ctx, catfileProcess, revlistChan)
- if tc.catfileInfoFilter != nil {
- catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, tc.catfileInfoFilter)
- }
-
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
-
- var results []catfileObjectResult
- for result := range catfileObjectChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.err != nil {
- result.err = errors.New(result.err.Error())
- }
-
- if result.err == nil {
- // While we could also assert object data, let's not do
- // this: it would just be too annoying.
- require.NotNil(t, result.objectReader)
-
- objectData, err := ioutil.ReadAll(result.objectReader)
- require.NoError(t, err)
- require.Len(t, objectData, int(result.objectInfo.Size))
-
- result.objectReader = nil
- }
-
- results = append(results, result)
- }
-
- require.Equal(t, tc.expectedResults, results)
- })
- }
-
- t.Run("context cancellation", func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- catfileCache := catfile.NewCache(cfg)
- defer catfileCache.Stop()
-
- catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
- require.NoError(t, err)
-
- // We need to create a separate child context because otherwise we'd kill the batch
- // process.
- childCtx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- revlistChan := revlist(childCtx, repo, []string{"--all"})
- revlistChan = revlistFilter(childCtx, revlistChan, func(revlistResult) bool { return true })
- catfileInfoChan := catfileInfo(childCtx, catfileProcess, revlistChan)
- catfileInfoChan = catfileInfoFilter(childCtx, catfileInfoChan, func(catfileInfoResult) bool { return true })
- catfileObjectChan := catfileObject(childCtx, catfileProcess, catfileInfoChan)
-
- i := 0
- for result := range catfileObjectChan {
- require.NoError(t, result.err)
- i++
-
- _, err := io.Copy(ioutil.Discard, result.objectReader)
- require.NoError(t, err)
-
- if i == 3 {
- cancel()
- }
- }
-
- // Context cancellation is timing sensitive: at the point of cancelling the context,
- // the last pipeline step may already have queued up an additional result. We thus
- // cannot assert the exact number of requests, but we know that it's bounded.
- require.LessOrEqual(t, i, 4)
- })
-
- t.Run("interleaving object reads", func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- catfileCache := catfile.NewCache(cfg)
- defer catfileCache.Stop()
-
- catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
- require.NoError(t, err)
-
- revlistChan := revlist(ctx, repo, []string{"--all"})
- catfileInfoChan := catfileInfo(ctx, catfileProcess, revlistChan)
- catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
-
- i := 0
- var wg sync.WaitGroup
- for result := range catfileObjectChan {
- require.NoError(t, result.err)
-
- wg.Add(1)
- i++
-
- // With the catfile package, one mustn't ever request a new object before
- // the old object's reader was completely consumed. We cannot reliably test
- // this given that the object channel, if it behaves correctly, will block
- // until we've read the old object. Chances are high though that we'd
- // eventually hit the race here in case we didn't correctly synchronize on
- // the object reader.
- go func(object catfileObjectResult) {
- defer wg.Done()
- _, err := io.Copy(ioutil.Discard, object.objectReader)
- require.NoError(t, err)
- }(result)
- }
-
- wg.Wait()
-
- // We could in theory assert the exact amount of objects, but this would make it
- // harder than necessary to change the test repo's contents.
- require.Greater(t, i, 1000)
- })
-}