Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-28 09:51:06 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-28 10:02:14 +0300
commit6c99d927a1c7a1cfc07f97cbb60059c2dbc8e022 (patch)
treee33fbaf3526aac794f740c06e9c2afede3c3ca7c
parentf9ef4d6508e5d13745664cea2e944b65e69be14d (diff)
gitpipe: Implement iterator for catfile info step
The pipeline steps are currently easy to misuse: the structures returned on the channel all have an `Err` field, which MUST be checked by the caller. It's easy enough to forget if one isn't prepared to do so. Refactor the catfile info step to instead return a CatfileInfoIterator. This iterator follows the typical Go idiom of iterators with `Next()`, `Err()` and `Result()` functions. Given that the iterator is an interface, this also makes it easily possible for callers to replace it with their own adapter.
-rw-r--r--internal/git/gitpipe/catfile_info.go47
-rw-r--r--internal/git/gitpipe/catfile_info_iterator.go51
-rw-r--r--internal/git/gitpipe/catfile_info_test.go53
-rw-r--r--internal/git/gitpipe/catfile_object.go14
-rw-r--r--internal/git/gitpipe/catfile_object_test.go8
-rw-r--r--internal/git/gitpipe/pipeline_test.go16
-rw-r--r--internal/gitaly/service/blob/blobs.go16
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go18
8 files changed, 143 insertions, 80 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 9778692d8..5d928ef51 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -14,8 +14,8 @@ import (
// CatfileInfoResult is a result for the CatfileInfo pipeline step.
type CatfileInfoResult struct {
- // Err is an error which occurred during execution of the pipeline.
- Err error
+ // err is an error which occurred during execution of the pipeline.
+ err error
// ObjectName is the object name as received from the revlistResultChan.
ObjectName []byte
@@ -27,7 +27,7 @@ type CatfileInfoResult struct {
// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
// results. Any error received via the channel or encountered in this step will cause the pipeline
// to fail. Context cancellation will gracefully halt the pipeline.
-func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) <-chan CatfileInfoResult {
+func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) CatfileInfoIterator {
resultChan := make(chan CatfileInfoResult)
go func() {
@@ -48,7 +48,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev
objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision())
if err != nil {
sendResult(CatfileInfoResult{
- Err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err),
+ err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err),
})
return
}
@@ -62,12 +62,14 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev
}
if err := revlistIterator.Err(); err != nil {
- sendResult(CatfileInfoResult{Err: err})
+ sendResult(CatfileInfoResult{err: err})
return
}
}()
- return resultChan
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
}
// CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
@@ -75,7 +77,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev
// all processed results. Any error encountered during execution of this pipeline step will cause
// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
// pipeline step, the resulting catfileInfoResults will never have an object name.
-func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan CatfileInfoResult {
+func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInfoIterator {
resultChan := make(chan CatfileInfoResult)
go func() {
@@ -101,7 +103,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat
})
if err != nil {
sendResult(CatfileInfoResult{
- Err: fmt.Errorf("spawning cat-file failed: %w", err),
+ err: fmt.Errorf("spawning cat-file failed: %w", err),
})
return
}
@@ -115,7 +117,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat
}
sendResult(CatfileInfoResult{
- Err: fmt.Errorf("parsing object info: %w", err),
+ err: fmt.Errorf("parsing object info: %w", err),
})
return
}
@@ -129,25 +131,29 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat
if err := cmd.Wait(); err != nil {
sendResult(CatfileInfoResult{
- Err: fmt.Errorf("cat-file failed: %w", err),
+ err: fmt.Errorf("cat-file failed: %w", err),
})
return
}
}()
- return resultChan
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
}
// CatfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
// function: if the filter returns `false` for a given item, then it will be dropped from the
// pipeline. Errors cannot be filtered and will always be passed through.
-func CatfileInfoFilter(ctx context.Context, c <-chan CatfileInfoResult, filter func(CatfileInfoResult) bool) <-chan CatfileInfoResult {
+func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(CatfileInfoResult) bool) CatfileInfoIterator {
resultChan := make(chan CatfileInfoResult)
+
go func() {
defer close(resultChan)
- for result := range c {
- if result.Err != nil || filter(result) {
+ for it.Next() {
+ result := it.Result()
+ if filter(result) {
select {
case resultChan <- result:
case <-ctx.Done():
@@ -155,6 +161,17 @@ func CatfileInfoFilter(ctx context.Context, c <-chan CatfileInfoResult, filter f
}
}
}
+
+ if err := it.Err(); err != nil {
+ select {
+ case resultChan <- CatfileInfoResult{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
- return resultChan
+
+ return &catfileInfoIterator{
+ ch: resultChan,
+ }
}
diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go
new file mode 100644
index 000000000..54a75b5d9
--- /dev/null
+++ b/internal/git/gitpipe/catfile_info_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// CatfileInfoIterator is an iterator returned by the Revlist function.
+type CatfileInfoIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() CatfileInfoResult
+}
+
+// NewCatfileInfoIterator returns a new CatfileInfoIterator for the given items.
+func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator {
+ itemChan := make(chan CatfileInfoResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &catfileInfoIterator{
+ ch: itemChan,
+ }
+}
+
+type catfileInfoIterator struct {
+ ch <-chan CatfileInfoResult
+ result CatfileInfoResult
+}
+
+func (it *catfileInfoIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *catfileInfoIterator) Err() error {
+ return it.result.err
+}
+
+func (it *catfileInfoIterator) Result() CatfileInfoResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 80fa40dd9..a57b7075d 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -30,6 +30,7 @@ func TestCatfileInfo(t *testing.T) {
desc string
revlistInputs []RevlistResult
expectedResults []CatfileInfoResult
+ expectedErr error
}{
{
desc: "single blob",
@@ -71,9 +72,7 @@ func TestCatfileInfo(t *testing.T) {
revlistInputs: []RevlistResult{
{OID: "invalidobjectid"},
},
- expectedResults: []CatfileInfoResult{
- {Err: errors.New("retrieving object info for \"invalidobjectid\": object not found")},
- },
+ expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"),
},
{
desc: "mixed valid and invalid revision",
@@ -84,8 +83,8 @@ func TestCatfileInfo(t *testing.T) {
},
expectedResults: []CatfileInfoResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {Err: errors.New("retrieving object info for \"invalidobjectid\": object not found")},
},
+ expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"),
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -98,19 +97,21 @@ func TestCatfileInfo(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- resultChan := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs))
+ it := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs))
var results []CatfileInfoResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.Err != nil {
- result.Err = errors.New(result.Err.Error())
- }
-
- results = append(results, result)
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+
+ // We're converting the error here to a plain un-nested error such
+ // that we don't have to replicate the complete error's structure.
+ err = it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
}
+ require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedResults, results)
})
}
@@ -133,13 +134,13 @@ func TestCatfileInfoAllObjects(t *testing.T) {
})
commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
- resultChan := CatfileInfoAllObjects(ctx, repo)
+ it := CatfileInfoAllObjects(ctx, repo)
var results []CatfileInfoResult
- for result := range resultChan {
- require.NoError(t, result.Err)
- results = append(results, result)
+ for it.Next() {
+ results = append(results, it.Result())
}
+ require.NoError(t, it.Err())
require.ElementsMatch(t, []CatfileInfoResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
@@ -155,6 +156,7 @@ func TestCatfileInfoFilter(t *testing.T) {
input []CatfileInfoResult
filter func(CatfileInfoResult) bool
expectedResults []CatfileInfoResult
+ expectedErr error
}{
{
desc: "all accepted",
@@ -188,15 +190,13 @@ func TestCatfileInfoFilter(t *testing.T) {
input: []CatfileInfoResult{
{ObjectName: []byte{'a'}},
{ObjectName: []byte{'b'}},
- {Err: errors.New("foobar")},
+ {err: errors.New("foobar")},
{ObjectName: []byte{'c'}},
},
filter: func(CatfileInfoResult) bool {
return false
},
- expectedResults: []CatfileInfoResult{
- {Err: errors.New("foobar")},
- },
+ expectedErr: errors.New("foobar"),
},
{
desc: "subset filtered",
@@ -217,17 +217,14 @@ func TestCatfileInfoFilter(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- inputChan := make(chan CatfileInfoResult, len(tc.input))
- for _, input := range tc.input {
- inputChan <- input
- }
- close(inputChan)
+ it := CatfileInfoFilter(ctx, NewCatfileInfoIterator(tc.input), tc.filter)
var results []CatfileInfoResult
- for result := range CatfileInfoFilter(ctx, inputChan, tc.filter) {
- results = append(results, result)
+ for it.Next() {
+ results = append(results, it.Result())
}
+ require.Equal(t, tc.expectedErr, it.Err())
require.Equal(t, tc.expectedResults, results)
})
}
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 069fea22f..627653f6d 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -32,7 +32,7 @@ type CatfileObjectResult struct {
func CatfileObject(
ctx context.Context,
catfileProcess catfile.Batch,
- catfileInfoResultChan <-chan CatfileInfoResult,
+ it CatfileInfoIterator,
) <-chan CatfileObjectResult {
resultChan := make(chan CatfileObjectResult)
go func() {
@@ -49,11 +49,8 @@ func CatfileObject(
var objectReader *signallingReader
- for catfileInfoResult := range catfileInfoResultChan {
- if catfileInfoResult.Err != nil {
- sendResult(CatfileObjectResult{Err: catfileInfoResult.Err})
- return
- }
+ for it.Next() {
+ catfileInfoResult := it.Result()
// We mustn't try to read another object before reading the previous object
// has concluded. Given that this is not under our control but under the
@@ -104,6 +101,11 @@ func CatfileObject(
return
}
}
+
+ if err := it.Err(); err != nil {
+ sendResult(CatfileObjectResult{Err: err})
+ return
+ }
}()
return resultChan
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 928c4866a..1dcd3a8b3 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -101,13 +101,7 @@ func TestCatfileObject(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- catfileInfoResultChan := make(chan CatfileInfoResult, len(tc.catfileInfoInputs))
- for _, input := range tc.catfileInfoInputs {
- catfileInfoResultChan <- input
- }
- close(catfileInfoResultChan)
-
- resultChan := CatfileObject(ctx, catfileProcess, catfileInfoResultChan)
+ resultChan := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs))
var results []CatfileObjectResult
for result := range resultChan {
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index e76d371a8..7f0d612af 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -176,12 +176,12 @@ func TestPipeline(t *testing.T) {
revlistIter = RevlistFilter(ctx, revlistIter, tc.revlistFilter)
}
- catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
if tc.catfileInfoFilter != nil {
- catfileInfoChan = CatfileInfoFilter(ctx, catfileInfoChan, tc.catfileInfoFilter)
+ catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter)
}
- catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter)
var results []CatfileObjectResult
for result := range catfileObjectChan {
@@ -227,9 +227,9 @@ func TestPipeline(t *testing.T) {
revlistIter := Revlist(childCtx, repo, []string{"--all"})
revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true })
- catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistIter)
- catfileInfoChan = CatfileInfoFilter(childCtx, catfileInfoChan, func(CatfileInfoResult) bool { return true })
- catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoChan)
+ catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter)
+ catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
+ catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoIter)
i := 0
for result := range catfileObjectChan {
@@ -261,8 +261,8 @@ func TestPipeline(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, []string{"--all"})
- catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter)
- catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter)
i := 0
var wg sync.WaitGroup
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index bb1f42bf3..73805c24e 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -65,8 +65,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
}
revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
- catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob"
})
@@ -74,10 +74,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
// We can thus skip reading blob contents completely.
if req.GetBytesLimit() == 0 {
var i uint32
- for blob := range catfileInfoChan {
- if blob.Err != nil {
- return helper.ErrInternal(blob.Err)
- }
+ for catfileInfoIter.Next() {
+ blob := catfileInfoIter.Result()
if err := chunker.Send(&gitalypb.ListBlobsResponse_Blob{
Oid: blob.ObjectInfo.Oid.String(),
@@ -91,8 +89,12 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
break
}
}
+
+ if err := catfileInfoIter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
} else {
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
var i uint32
for blob := range catfileObjectChan {
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 451190c76..1357c501a 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -85,11 +85,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
}
revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
- catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
return err
@@ -147,11 +147,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
}
- catfileInfoChan := gitpipe.CatfileInfoAllObjects(ctx, repo)
- catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
return err
@@ -200,11 +200,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
blobs[i] = gitpipe.RevlistResult{OID: git.ObjectID(blobID)}
}
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs))
- catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
+ catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs))
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil {
return err