diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-28 09:51:06 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-28 10:02:14 +0300 |
commit | 6c99d927a1c7a1cfc07f97cbb60059c2dbc8e022 (patch) | |
tree | e33fbaf3526aac794f740c06e9c2afede3c3ca7c | |
parent | f9ef4d6508e5d13745664cea2e944b65e69be14d (diff) |
gitpipe: Implement iterator for catfile info step
The pipeline steps are currently easy to misuse: the structures returned
on the channel all have an `Err` field, which MUST be checked by the
caller. It's easy enough to forget if one isn't prepared to do so.
Refactor the catfile info step to instead return a CatfileInfoIterator.
This iterator follows the typical Go idiom of iterators with `Next()`,
`Err()` and `Result()` functions. Given that the iterator is an
interface, this also makes it easily possible for callers to replace it
with their own adapter.
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 47 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_iterator.go | 51 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 53 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 14 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 8 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 16 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 18 |
8 files changed, 143 insertions, 80 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 9778692d8..5d928ef51 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -14,8 +14,8 @@ import ( // CatfileInfoResult is a result for the CatfileInfo pipeline step. type CatfileInfoResult struct { - // Err is an error which occurred during execution of the pipeline. - Err error + // err is an error which occurred during execution of the pipeline. + err error // ObjectName is the object name as received from the revlistResultChan. ObjectName []byte @@ -27,7 +27,7 @@ type CatfileInfoResult struct { // `git cat-file --batch-check`. The returned channel will contain all processed catfile info // results. Any error received via the channel or encountered in this step will cause the pipeline // to fail. Context cancellation will gracefully halt the pipeline. -func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) <-chan CatfileInfoResult { +func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) CatfileInfoIterator { resultChan := make(chan CatfileInfoResult) go func() { @@ -48,7 +48,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision()) if err != nil { sendResult(CatfileInfoResult{ - Err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err), + err: fmt.Errorf("retrieving object info for %q: %w", revlistResult.OID, err), }) return } @@ -62,12 +62,14 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev } if err := revlistIterator.Err(); err != nil { - sendResult(CatfileInfoResult{Err: err}) + sendResult(CatfileInfoResult{err: err}) return } }() - return resultChan + return &catfileInfoIterator{ + ch: resultChan, + } } // CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and @@ -75,7 +77,7 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator Rev // all processed results. Any error encountered during execution of this pipeline step will cause // the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this // pipeline step, the resulting catfileInfoResults will never have an object name. -func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan CatfileInfoResult { +func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) CatfileInfoIterator { resultChan := make(chan CatfileInfoResult) go func() { @@ -101,7 +103,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat }) if err != nil { sendResult(CatfileInfoResult{ - Err: fmt.Errorf("spawning cat-file failed: %w", err), + err: fmt.Errorf("spawning cat-file failed: %w", err), }) return } @@ -115,7 +117,7 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat } sendResult(CatfileInfoResult{ - Err: fmt.Errorf("parsing object info: %w", err), + err: fmt.Errorf("parsing object info: %w", err), }) return } @@ -129,25 +131,29 @@ func CatfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan Cat if err := cmd.Wait(); err != nil { sendResult(CatfileInfoResult{ - Err: fmt.Errorf("cat-file failed: %w", err), + err: fmt.Errorf("cat-file failed: %w", err), }) return } }() - return resultChan + return &catfileInfoIterator{ + ch: resultChan, + } } // CatfileInfoFilter filters the catfileInfoResults from the provided channel with the filter // function: if the filter returns `false` for a given item, then it will be dropped from the // pipeline. Errors cannot be filtered and will always be passed through. -func CatfileInfoFilter(ctx context.Context, c <-chan CatfileInfoResult, filter func(CatfileInfoResult) bool) <-chan CatfileInfoResult { +func CatfileInfoFilter(ctx context.Context, it CatfileInfoIterator, filter func(CatfileInfoResult) bool) CatfileInfoIterator { resultChan := make(chan CatfileInfoResult) + go func() { defer close(resultChan) - for result := range c { - if result.Err != nil || filter(result) { + for it.Next() { + result := it.Result() + if filter(result) { select { case resultChan <- result: case <-ctx.Done(): @@ -155,6 +161,17 @@ func CatfileInfoFilter(ctx context.Context, c <-chan CatfileInfoResult, filter f } } } + + if err := it.Err(); err != nil { + select { + case resultChan <- CatfileInfoResult{err: err}: + case <-ctx.Done(): + return + } + } }() - return resultChan + + return &catfileInfoIterator{ + ch: resultChan, + } } diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go new file mode 100644 index 000000000..54a75b5d9 --- /dev/null +++ b/internal/git/gitpipe/catfile_info_iterator.go @@ -0,0 +1,51 @@ +package gitpipe + +// CatfileInfoIterator is an iterator returned by the Revlist function. +type CatfileInfoIterator interface { + // Next iterates to the next item. Returns `false` in case there are no more results left. + Next() bool + // Err returns the first error that was encountered. + Err() error + // Result returns the current item. + Result() CatfileInfoResult +} + +// NewCatfileInfoIterator returns a new CatfileInfoIterator for the given items. +func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator { + itemChan := make(chan CatfileInfoResult, len(items)) + for _, item := range items { + itemChan <- item + } + close(itemChan) + + return &catfileInfoIterator{ + ch: itemChan, + } +} + +type catfileInfoIterator struct { + ch <-chan CatfileInfoResult + result CatfileInfoResult +} + +func (it *catfileInfoIterator) Next() bool { + if it.result.err != nil { + return false + } + + var ok bool + it.result, ok = <-it.ch + if !ok || it.result.err != nil { + return false + } + + return true +} + +func (it *catfileInfoIterator) Err() error { + return it.result.err +} + +func (it *catfileInfoIterator) Result() CatfileInfoResult { + return it.result +} diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 80fa40dd9..a57b7075d 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -30,6 +30,7 @@ func TestCatfileInfo(t *testing.T) { desc string revlistInputs []RevlistResult expectedResults []CatfileInfoResult + expectedErr error }{ { desc: "single blob", @@ -71,9 +72,7 @@ func TestCatfileInfo(t *testing.T) { revlistInputs: []RevlistResult{ {OID: "invalidobjectid"}, }, - expectedResults: []CatfileInfoResult{ - {Err: errors.New("retrieving object info for \"invalidobjectid\": object not found")}, - }, + expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"), }, { desc: "mixed valid and invalid revision", @@ -84,8 +83,8 @@ func TestCatfileInfo(t *testing.T) { }, expectedResults: []CatfileInfoResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}}, - {Err: errors.New("retrieving object info for \"invalidobjectid\": object not found")}, }, + expectedErr: errors.New("retrieving object info for \"invalidobjectid\": object not found"), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -98,19 +97,21 @@ func TestCatfileInfo(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - resultChan := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs)) + it := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs)) var results []CatfileInfoResult - for result := range resultChan { - // We're converting the error here to a plain un-nested error such - // that we don't have to replicate the complete error's structure. - if result.Err != nil { - result.Err = errors.New(result.Err.Error()) - } - - results = append(results, result) + for it.Next() { + results = append(results, it.Result()) + } + + // We're converting the error here to a plain un-nested error such + // that we don't have to replicate the complete error's structure. + err = it.Err() + if err != nil { + err = errors.New(err.Error()) } + require.Equal(t, tc.expectedErr, err) require.Equal(t, tc.expectedResults, results) }) } @@ -133,13 +134,13 @@ func TestCatfileInfoAllObjects(t *testing.T) { }) commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) - resultChan := CatfileInfoAllObjects(ctx, repo) + it := CatfileInfoAllObjects(ctx, repo) var results []CatfileInfoResult - for result := range resultChan { - require.NoError(t, result.Err) - results = append(results, result) + for it.Next() { + results = append(results, it.Result()) } + require.NoError(t, it.Err()) require.ElementsMatch(t, []CatfileInfoResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}}, @@ -155,6 +156,7 @@ func TestCatfileInfoFilter(t *testing.T) { input []CatfileInfoResult filter func(CatfileInfoResult) bool expectedResults []CatfileInfoResult + expectedErr error }{ { desc: "all accepted", @@ -188,15 +190,13 @@ func TestCatfileInfoFilter(t *testing.T) { input: []CatfileInfoResult{ {ObjectName: []byte{'a'}}, {ObjectName: []byte{'b'}}, - {Err: errors.New("foobar")}, + {err: errors.New("foobar")}, {ObjectName: []byte{'c'}}, }, filter: func(CatfileInfoResult) bool { return false }, - expectedResults: []CatfileInfoResult{ - {Err: errors.New("foobar")}, - }, + expectedErr: errors.New("foobar"), }, { desc: "subset filtered", @@ -217,17 +217,14 @@ func TestCatfileInfoFilter(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - inputChan := make(chan CatfileInfoResult, len(tc.input)) - for _, input := range tc.input { - inputChan <- input - } - close(inputChan) + it := CatfileInfoFilter(ctx, NewCatfileInfoIterator(tc.input), tc.filter) var results []CatfileInfoResult - for result := range CatfileInfoFilter(ctx, inputChan, tc.filter) { - results = append(results, result) + for it.Next() { + results = append(results, it.Result()) } + require.Equal(t, tc.expectedErr, it.Err()) require.Equal(t, tc.expectedResults, results) }) } diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 069fea22f..627653f6d 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -32,7 +32,7 @@ type CatfileObjectResult struct { func CatfileObject( ctx context.Context, catfileProcess catfile.Batch, - catfileInfoResultChan <-chan CatfileInfoResult, + it CatfileInfoIterator, ) <-chan CatfileObjectResult { resultChan := make(chan CatfileObjectResult) go func() { @@ -49,11 +49,8 @@ func CatfileObject( var objectReader *signallingReader - for catfileInfoResult := range catfileInfoResultChan { - if catfileInfoResult.Err != nil { - sendResult(CatfileObjectResult{Err: catfileInfoResult.Err}) - return - } + for it.Next() { + catfileInfoResult := it.Result() // We mustn't try to read another object before reading the previous object // has concluded. Given that this is not under our control but under the @@ -104,6 +101,11 @@ func CatfileObject( return } } + + if err := it.Err(); err != nil { + sendResult(CatfileObjectResult{Err: err}) + return + } }() return resultChan diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 928c4866a..1dcd3a8b3 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -101,13 +101,7 @@ func TestCatfileObject(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - catfileInfoResultChan := make(chan CatfileInfoResult, len(tc.catfileInfoInputs)) - for _, input := range tc.catfileInfoInputs { - catfileInfoResultChan <- input - } - close(catfileInfoResultChan) - - resultChan := CatfileObject(ctx, catfileProcess, catfileInfoResultChan) + resultChan := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs)) var results []CatfileObjectResult for result := range resultChan { diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index e76d371a8..7f0d612af 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -176,12 +176,12 @@ func TestPipeline(t *testing.T) { revlistIter = RevlistFilter(ctx, revlistIter, tc.revlistFilter) } - catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter) + catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter) if tc.catfileInfoFilter != nil { - catfileInfoChan = CatfileInfoFilter(ctx, catfileInfoChan, tc.catfileInfoFilter) + catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter) } - catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter) var results []CatfileObjectResult for result := range catfileObjectChan { @@ -227,9 +227,9 @@ func TestPipeline(t *testing.T) { revlistIter := Revlist(childCtx, repo, []string{"--all"}) revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true }) - catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistIter) - catfileInfoChan = CatfileInfoFilter(childCtx, catfileInfoChan, func(CatfileInfoResult) bool { return true }) - catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoChan) + catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter) + catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true }) + catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoIter) i := 0 for result := range catfileObjectChan { @@ -261,8 +261,8 @@ func TestPipeline(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, []string{"--all"}) - catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter) - catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter) + catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter) i := 0 var wg sync.WaitGroup diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index bb1f42bf3..73805c24e 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -65,8 +65,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS } revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...) - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) - catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { + catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) + catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" }) @@ -74,10 +74,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS // We can thus skip reading blob contents completely. if req.GetBytesLimit() == 0 { var i uint32 - for blob := range catfileInfoChan { - if blob.Err != nil { - return helper.ErrInternal(blob.Err) - } + for catfileInfoIter.Next() { + blob := catfileInfoIter.Result() if err := chunker.Send(&gitalypb.ListBlobsResponse_Blob{ Oid: blob.ObjectInfo.Oid.String(), @@ -91,8 +89,12 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS break } } + + if err := catfileInfoIter.Err(); err != nil { + return helper.ErrInternal(err) + } } else { - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) var i uint32 for blob := range catfileObjectChan { diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 451190c76..1357c501a 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -85,11 +85,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git } revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...) - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) - catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { + catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) + catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { return err @@ -147,11 +147,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err)) } - catfileInfoChan := gitpipe.CatfileInfoAllObjects(ctx, repo) - catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { + catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo) + catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { return err @@ -200,11 +200,11 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita blobs[i] = gitpipe.RevlistResult{OID: git.ObjectID(blobID)} } - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs)) - catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { + catfileInfoIter := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs)) + catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil { return err |