diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-24 15:16:13 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-28 10:02:14 +0300 |
commit | 39119388e82bccaf1de432a7a23b989f311d6b13 (patch) | |
tree | e48cb2a2c1612b4c6aa4a7d560f76786e6f4bc96 | |
parent | 6c99d927a1c7a1cfc07f97cbb60059c2dbc8e022 (diff) |
gitpipe: Implement iterator for catfile object step
The pipeline steps are currently easy to misuse: the structures returned
on the channel all have an `Err` field, which MUST be checked by the
caller. It's easy enough to forget if one isn't prepared to do so.
Refactor the catfile object step to instead return a
CatfileObjectIterator. This iterator follows the typical Go idiom of
iterators with `Next()`, `Err()` and `Result()` functions. Given that
the iterator is an interface, this also makes it easily possible for
callers to replace it with their own adapter.
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 14 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_iterator.go | 51 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 46 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 65 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 12 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 24 |
6 files changed, 132 insertions, 80 deletions
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 627653f6d..5a50c1298 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -12,8 +12,8 @@ import ( // CatfileObjectResult is a result for the CatfileObject pipeline step. type CatfileObjectResult struct { - // Err is an error which occurred during execution of the pipeline. - Err error + // err is an error which occurred during execution of the pipeline. + err error // ObjectName is the object name as received from the revlistResultChan. ObjectName []byte @@ -33,7 +33,7 @@ func CatfileObject( ctx context.Context, catfileProcess catfile.Batch, it CatfileInfoIterator, -) <-chan CatfileObjectResult { +) CatfileObjectIterator { resultChan := make(chan CatfileObjectResult) go func() { defer close(resultChan) @@ -83,7 +83,7 @@ func CatfileObject( if err != nil { sendResult(CatfileObjectResult{ - Err: fmt.Errorf("requesting object: %w", err), + err: fmt.Errorf("requesting object: %w", err), }) return } @@ -103,12 +103,14 @@ func CatfileObject( } if err := it.Err(); err != nil { - sendResult(CatfileObjectResult{Err: err}) + sendResult(CatfileObjectResult{err: err}) return } }() - return resultChan + return &catfileObjectIterator{ + ch: resultChan, + } } type signallingReader struct { diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go new file mode 100644 index 000000000..372a94f3c --- /dev/null +++ b/internal/git/gitpipe/catfile_object_iterator.go @@ -0,0 +1,51 @@ +package gitpipe + +// CatfileObjectIterator is an iterator returned by the Revlist function. +type CatfileObjectIterator interface { + // Next iterates to the next item. Returns `false` in case there are no more results left. + Next() bool + // Err returns the first error that was encountered. + Err() error + // Result returns the current item. + Result() CatfileObjectResult +} + +// NewCatfileObjectIterator returns a new CatfileObjectIterator for the given items. +func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator { + itemChan := make(chan CatfileObjectResult, len(items)) + for _, item := range items { + itemChan <- item + } + close(itemChan) + + return &catfileObjectIterator{ + ch: itemChan, + } +} + +type catfileObjectIterator struct { + ch <-chan CatfileObjectResult + result CatfileObjectResult +} + +func (it *catfileObjectIterator) Next() bool { + if it.result.err != nil { + return false + } + + var ok bool + it.result, ok = <-it.ch + if !ok || it.result.err != nil { + return false + } + + return true +} + +func (it *catfileObjectIterator) Err() error { + return it.result.err +} + +func (it *catfileObjectIterator) Result() CatfileObjectResult { + return it.result +} diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 1dcd3a8b3..eda42531a 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -24,6 +24,7 @@ func TestCatfileObject(t *testing.T) { desc string catfileInfoInputs []CatfileInfoResult expectedResults []CatfileObjectResult + expectedErr error }{ { desc: "single blob", @@ -65,18 +66,14 @@ func TestCatfileObject(t *testing.T) { catfileInfoInputs: []CatfileInfoResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: "invalidobjectid", Type: "blob"}}, }, - expectedResults: []CatfileObjectResult{ - {Err: errors.New("requesting object: object not found")}, - }, + expectedErr: errors.New("requesting object: object not found"), }, { desc: "invalid object type", catfileInfoInputs: []CatfileInfoResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}}, }, - expectedResults: []CatfileObjectResult{ - {Err: errors.New("requesting object: unknown object type \"foobar\"")}, - }, + expectedErr: errors.New("requesting object: unknown object type \"foobar\""), }, { desc: "mixed valid and invalid revision", @@ -87,8 +84,8 @@ func TestCatfileObject(t *testing.T) { }, expectedResults: []CatfileObjectResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}}, - {Err: errors.New("requesting object: unknown object type \"foobar\"")}, }, + expectedErr: errors.New("requesting object: unknown object type \"foobar\""), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -101,31 +98,32 @@ func TestCatfileObject(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - resultChan := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs)) + it := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs)) var results []CatfileObjectResult - for result := range resultChan { - // We're converting the error here to a plain un-nested error such - // that we don't have to replicate the complete error's structure. - if result.Err != nil { - result.Err = errors.New(result.Err.Error()) - } + for it.Next() { + result := it.Result() - if result.Err == nil { - // While we could also assert object data, let's not do - // this: it would just be too annoying. - require.NotNil(t, result.ObjectReader) + // While we could also assert object data, let's not do + // this: it would just be too annoying. + require.NotNil(t, result.ObjectReader) - objectData, err := ioutil.ReadAll(result.ObjectReader) - require.NoError(t, err) - require.Len(t, objectData, int(result.ObjectInfo.Size)) - - result.ObjectReader = nil - } + objectData, err := ioutil.ReadAll(result.ObjectReader) + require.NoError(t, err) + require.Len(t, objectData, int(result.ObjectInfo.Size)) + result.ObjectReader = nil results = append(results, result) } + // We're converting the error here to a plain un-nested error such + // that we don't have to replicate the complete error's structure. + err = it.Err() + if err != nil { + err = errors.New(err.Error()) + } + + require.Equal(t, tc.expectedErr, err) require.Equal(t, tc.expectedResults, results) }) } diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 7f0d612af..75b3396c3 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -29,6 +29,7 @@ func TestPipeline(t *testing.T) { revlistFilter func(RevlistResult) bool catfileInfoFilter func(CatfileInfoResult) bool expectedResults []CatfileObjectResult + expectedErr error }{ { desc: "single blob", @@ -128,9 +129,7 @@ func TestPipeline(t *testing.T) { revisions: []string{ "doesnotexist", }, - expectedResults: []CatfileObjectResult{ - {Err: errors.New("rev-list pipeline command: exit status 128")}, - }, + expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, { desc: "mixed valid and invalid revision", @@ -139,9 +138,7 @@ func TestPipeline(t *testing.T) { "doesnotexist", lfsPointer2, }, - expectedResults: []CatfileObjectResult{ - {Err: errors.New("rev-list pipeline command: exit status 128")}, - }, + expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, { desc: "invalid revision with all filters", @@ -156,9 +153,7 @@ func TestPipeline(t *testing.T) { require.Fail(t, "filter should not be invoked on errors") return true }, - expectedResults: []CatfileObjectResult{ - {Err: errors.New("rev-list pipeline command: exit status 128")}, - }, + expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -181,31 +176,33 @@ func TestPipeline(t *testing.T) { catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter) } - catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter) var results []CatfileObjectResult - for result := range catfileObjectChan { - // We're converting the error here to a plain un-nested error such - // that we don't have to replicate the complete error's structure. - if result.Err != nil { - result.Err = errors.New(result.Err.Error()) - } + for catfileObjectIter.Next() { + result := catfileObjectIter.Result() - if result.Err == nil { - // While we could also assert object data, let's not do - // this: it would just be too annoying. - require.NotNil(t, result.ObjectReader) + // While we could also assert object data, let's not do + // this: it would just be too annoying. + require.NotNil(t, result.ObjectReader) - objectData, err := ioutil.ReadAll(result.ObjectReader) - require.NoError(t, err) - require.Len(t, objectData, int(result.ObjectInfo.Size)) + objectData, err := ioutil.ReadAll(result.ObjectReader) + require.NoError(t, err) + require.Len(t, objectData, int(result.ObjectInfo.Size)) - result.ObjectReader = nil - } + result.ObjectReader = nil results = append(results, result) } + // We're converting the error here to a plain un-nested error such that we + // don't have to replicate the complete error's structure. + err = catfileObjectIter.Err() + if err != nil { + err = errors.New(err.Error()) + } + + require.Equal(t, tc.expectedErr, err) require.Equal(t, tc.expectedResults, results) }) } @@ -229,14 +226,13 @@ func TestPipeline(t *testing.T) { revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true }) catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter) catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true }) - catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoIter) + catfileObjectIter := CatfileObject(childCtx, catfileProcess, catfileInfoIter) i := 0 - for result := range catfileObjectChan { - require.NoError(t, result.Err) + for catfileObjectIter.Next() { i++ - _, err := io.Copy(ioutil.Discard, result.ObjectReader) + _, err := io.Copy(ioutil.Discard, catfileObjectIter.Result().ObjectReader) require.NoError(t, err) if i == 3 { @@ -244,6 +240,8 @@ func TestPipeline(t *testing.T) { } } + require.NoError(t, catfileObjectIter.Err()) + // Context cancellation is timing sensitive: at the point of cancelling the context, // the last pipeline step may already have queued up an additional result. We thus // cannot assert the exact number of requests, but we know that it's bounded. @@ -262,13 +260,11 @@ func TestPipeline(t *testing.T) { revlistIter := Revlist(ctx, repo, []string{"--all"}) catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter) - catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter) i := 0 var wg sync.WaitGroup - for result := range catfileObjectChan { - require.NoError(t, result.Err) - + for catfileObjectIter.Next() { wg.Add(1) i++ @@ -282,9 +278,10 @@ func TestPipeline(t *testing.T) { defer wg.Done() _, err := io.Copy(ioutil.Discard, object.ObjectReader) require.NoError(t, err) - }(result) + }(catfileObjectIter.Result()) } + require.NoError(t, catfileObjectIter.Err()) wg.Wait() // We could in theory assert the exact amount of objects, but this would make it diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index 73805c24e..78c447079 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -94,13 +94,11 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS return helper.ErrInternal(err) } } else { - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) var i uint32 - for blob := range catfileObjectChan { - if blob.Err != nil { - return helper.ErrInternal(blob.Err) - } + for catfileObjectIter.Next() { + blob := catfileObjectIter.Result() headerSent := false dataChunker := streamio.NewWriter(func(p []byte) error { @@ -155,6 +153,10 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS break } } + + if err := catfileObjectIter.Err(); err != nil { + return helper.ErrInternal(err) + } } if err := chunker.Flush(); err != nil { diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 1357c501a..46442f546 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -89,9 +89,9 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) - if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { + if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err } } @@ -151,9 +151,9 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) - if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { + if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err } } @@ -204,9 +204,9 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) - catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) + catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter) - if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil { + if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil { return err } } @@ -378,14 +378,12 @@ func (t *lfsPointerSender) Send() error { return t.send(t.pointers) } -func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan gitpipe.CatfileObjectResult, limit int) error { +func sendLFSPointers(chunker *chunk.Chunker, iter gitpipe.CatfileObjectIterator, limit int) error { buffer := bytes.NewBuffer(make([]byte, 0, lfsPointerMaxSize)) var i int - for lfsPointer := range lfsPointers { - if lfsPointer.Err != nil { - return helper.ErrInternal(lfsPointer.Err) - } + for iter.Next() { + lfsPointer := iter.Result() // Avoid allocating bytes for an LFS pointer until we know the current blob really // is an LFS pointer. @@ -419,6 +417,10 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan gitpipe.CatfileO } } + if err := iter.Err(); err != nil { + return helper.ErrInternal(err) + } + if err := chunker.Flush(); err != nil { return helper.ErrInternal(err) } |