Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-24 15:16:13 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-28 10:02:14 +0300
commit39119388e82bccaf1de432a7a23b989f311d6b13 (patch)
treee48cb2a2c1612b4c6aa4a7d560f76786e6f4bc96
parent6c99d927a1c7a1cfc07f97cbb60059c2dbc8e022 (diff)
gitpipe: Implement iterator for catfile object step
The pipeline steps are currently easy to misuse: the structures returned on the channel all have an `Err` field, which MUST be checked by the caller. It's easy enough to forget if one isn't prepared to do so. Refactor the catfile object step to instead return a CatfileObjectIterator. This iterator follows the typical Go idiom of iterators with `Next()`, `Err()` and `Result()` functions. Given that the iterator is an interface, this also makes it easily possible for callers to replace it with their own adapter.
-rw-r--r--internal/git/gitpipe/catfile_object.go14
-rw-r--r--internal/git/gitpipe/catfile_object_iterator.go51
-rw-r--r--internal/git/gitpipe/catfile_object_test.go46
-rw-r--r--internal/git/gitpipe/pipeline_test.go65
-rw-r--r--internal/gitaly/service/blob/blobs.go12
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go24
6 files changed, 132 insertions, 80 deletions
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 627653f6d..5a50c1298 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -12,8 +12,8 @@ import (
// CatfileObjectResult is a result for the CatfileObject pipeline step.
type CatfileObjectResult struct {
- // Err is an error which occurred during execution of the pipeline.
- Err error
+ // err is an error which occurred during execution of the pipeline.
+ err error
// ObjectName is the object name as received from the revlistResultChan.
ObjectName []byte
@@ -33,7 +33,7 @@ func CatfileObject(
ctx context.Context,
catfileProcess catfile.Batch,
it CatfileInfoIterator,
-) <-chan CatfileObjectResult {
+) CatfileObjectIterator {
resultChan := make(chan CatfileObjectResult)
go func() {
defer close(resultChan)
@@ -83,7 +83,7 @@ func CatfileObject(
if err != nil {
sendResult(CatfileObjectResult{
- Err: fmt.Errorf("requesting object: %w", err),
+ err: fmt.Errorf("requesting object: %w", err),
})
return
}
@@ -103,12 +103,14 @@ func CatfileObject(
}
if err := it.Err(); err != nil {
- sendResult(CatfileObjectResult{Err: err})
+ sendResult(CatfileObjectResult{err: err})
return
}
}()
- return resultChan
+ return &catfileObjectIterator{
+ ch: resultChan,
+ }
}
type signallingReader struct {
diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go
new file mode 100644
index 000000000..372a94f3c
--- /dev/null
+++ b/internal/git/gitpipe/catfile_object_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// CatfileObjectIterator is an iterator returned by the Revlist function.
+type CatfileObjectIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() CatfileObjectResult
+}
+
+// NewCatfileObjectIterator returns a new CatfileObjectIterator for the given items.
+func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator {
+ itemChan := make(chan CatfileObjectResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &catfileObjectIterator{
+ ch: itemChan,
+ }
+}
+
+type catfileObjectIterator struct {
+ ch <-chan CatfileObjectResult
+ result CatfileObjectResult
+}
+
+func (it *catfileObjectIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *catfileObjectIterator) Err() error {
+ return it.result.err
+}
+
+func (it *catfileObjectIterator) Result() CatfileObjectResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 1dcd3a8b3..eda42531a 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -24,6 +24,7 @@ func TestCatfileObject(t *testing.T) {
desc string
catfileInfoInputs []CatfileInfoResult
expectedResults []CatfileObjectResult
+ expectedErr error
}{
{
desc: "single blob",
@@ -65,18 +66,14 @@ func TestCatfileObject(t *testing.T) {
catfileInfoInputs: []CatfileInfoResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: "invalidobjectid", Type: "blob"}},
},
- expectedResults: []CatfileObjectResult{
- {Err: errors.New("requesting object: object not found")},
- },
+ expectedErr: errors.New("requesting object: object not found"),
},
{
desc: "invalid object type",
catfileInfoInputs: []CatfileInfoResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "foobar"}},
},
- expectedResults: []CatfileObjectResult{
- {Err: errors.New("requesting object: unknown object type \"foobar\"")},
- },
+ expectedErr: errors.New("requesting object: unknown object type \"foobar\""),
},
{
desc: "mixed valid and invalid revision",
@@ -87,8 +84,8 @@ func TestCatfileObject(t *testing.T) {
},
expectedResults: []CatfileObjectResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
- {Err: errors.New("requesting object: unknown object type \"foobar\"")},
},
+ expectedErr: errors.New("requesting object: unknown object type \"foobar\""),
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -101,31 +98,32 @@ func TestCatfileObject(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- resultChan := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ it := CatfileObject(ctx, catfileProcess, NewCatfileInfoIterator(tc.catfileInfoInputs))
var results []CatfileObjectResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.Err != nil {
- result.Err = errors.New(result.Err.Error())
- }
+ for it.Next() {
+ result := it.Result()
- if result.Err == nil {
- // While we could also assert object data, let's not do
- // this: it would just be too annoying.
- require.NotNil(t, result.ObjectReader)
+ // While we could also assert object data, let's not do
+ // this: it would just be too annoying.
+ require.NotNil(t, result.ObjectReader)
- objectData, err := ioutil.ReadAll(result.ObjectReader)
- require.NoError(t, err)
- require.Len(t, objectData, int(result.ObjectInfo.Size))
-
- result.ObjectReader = nil
- }
+ objectData, err := ioutil.ReadAll(result.ObjectReader)
+ require.NoError(t, err)
+ require.Len(t, objectData, int(result.ObjectInfo.Size))
+ result.ObjectReader = nil
results = append(results, result)
}
+ // We're converting the error here to a plain un-nested error such
+ // that we don't have to replicate the complete error's structure.
+ err = it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedResults, results)
})
}
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 7f0d612af..75b3396c3 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -29,6 +29,7 @@ func TestPipeline(t *testing.T) {
revlistFilter func(RevlistResult) bool
catfileInfoFilter func(CatfileInfoResult) bool
expectedResults []CatfileObjectResult
+ expectedErr error
}{
{
desc: "single blob",
@@ -128,9 +129,7 @@ func TestPipeline(t *testing.T) {
revisions: []string{
"doesnotexist",
},
- expectedResults: []CatfileObjectResult{
- {Err: errors.New("rev-list pipeline command: exit status 128")},
- },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
{
desc: "mixed valid and invalid revision",
@@ -139,9 +138,7 @@ func TestPipeline(t *testing.T) {
"doesnotexist",
lfsPointer2,
},
- expectedResults: []CatfileObjectResult{
- {Err: errors.New("rev-list pipeline command: exit status 128")},
- },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
{
desc: "invalid revision with all filters",
@@ -156,9 +153,7 @@ func TestPipeline(t *testing.T) {
require.Fail(t, "filter should not be invoked on errors")
return true
},
- expectedResults: []CatfileObjectResult{
- {Err: errors.New("rev-list pipeline command: exit status 128")},
- },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -181,31 +176,33 @@ func TestPipeline(t *testing.T) {
catfileInfoIter = CatfileInfoFilter(ctx, catfileInfoIter, tc.catfileInfoFilter)
}
- catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter)
var results []CatfileObjectResult
- for result := range catfileObjectChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.Err != nil {
- result.Err = errors.New(result.Err.Error())
- }
+ for catfileObjectIter.Next() {
+ result := catfileObjectIter.Result()
- if result.Err == nil {
- // While we could also assert object data, let's not do
- // this: it would just be too annoying.
- require.NotNil(t, result.ObjectReader)
+ // While we could also assert object data, let's not do
+ // this: it would just be too annoying.
+ require.NotNil(t, result.ObjectReader)
- objectData, err := ioutil.ReadAll(result.ObjectReader)
- require.NoError(t, err)
- require.Len(t, objectData, int(result.ObjectInfo.Size))
+ objectData, err := ioutil.ReadAll(result.ObjectReader)
+ require.NoError(t, err)
+ require.Len(t, objectData, int(result.ObjectInfo.Size))
- result.ObjectReader = nil
- }
+ result.ObjectReader = nil
results = append(results, result)
}
+ // We're converting the error here to a plain un-nested error such that we
+ // don't have to replicate the complete error's structure.
+ err = catfileObjectIter.Err()
+ if err != nil {
+ err = errors.New(err.Error())
+ }
+
+ require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedResults, results)
})
}
@@ -229,14 +226,13 @@ func TestPipeline(t *testing.T) {
revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true })
catfileInfoIter := CatfileInfo(childCtx, catfileProcess, revlistIter)
catfileInfoIter = CatfileInfoFilter(childCtx, catfileInfoIter, func(CatfileInfoResult) bool { return true })
- catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := CatfileObject(childCtx, catfileProcess, catfileInfoIter)
i := 0
- for result := range catfileObjectChan {
- require.NoError(t, result.Err)
+ for catfileObjectIter.Next() {
i++
- _, err := io.Copy(ioutil.Discard, result.ObjectReader)
+ _, err := io.Copy(ioutil.Discard, catfileObjectIter.Result().ObjectReader)
require.NoError(t, err)
if i == 3 {
@@ -244,6 +240,8 @@ func TestPipeline(t *testing.T) {
}
}
+ require.NoError(t, catfileObjectIter.Err())
+
// Context cancellation is timing sensitive: at the point of cancelling the context,
// the last pipeline step may already have queued up an additional result. We thus
// cannot assert the exact number of requests, but we know that it's bounded.
@@ -262,13 +260,11 @@ func TestPipeline(t *testing.T) {
revlistIter := Revlist(ctx, repo, []string{"--all"})
catfileInfoIter := CatfileInfo(ctx, catfileProcess, revlistIter)
- catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := CatfileObject(ctx, catfileProcess, catfileInfoIter)
i := 0
var wg sync.WaitGroup
- for result := range catfileObjectChan {
- require.NoError(t, result.Err)
-
+ for catfileObjectIter.Next() {
wg.Add(1)
i++
@@ -282,9 +278,10 @@ func TestPipeline(t *testing.T) {
defer wg.Done()
_, err := io.Copy(ioutil.Discard, object.ObjectReader)
require.NoError(t, err)
- }(result)
+ }(catfileObjectIter.Result())
}
+ require.NoError(t, catfileObjectIter.Err())
wg.Wait()
// We could in theory assert the exact amount of objects, but this would make it
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 73805c24e..78c447079 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -94,13 +94,11 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
return helper.ErrInternal(err)
}
} else {
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
var i uint32
- for blob := range catfileObjectChan {
- if blob.Err != nil {
- return helper.ErrInternal(blob.Err)
- }
+ for catfileObjectIter.Next() {
+ blob := catfileObjectIter.Result()
headerSent := false
dataChunker := streamio.NewWriter(func(p []byte) error {
@@ -155,6 +153,10 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
break
}
}
+
+ if err := catfileObjectIter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
}
if err := chunker.Flush(); err != nil {
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 1357c501a..46442f546 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -89,9 +89,9 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
}
}
@@ -151,9 +151,9 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
}
}
@@ -204,9 +204,9 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
- catfileObjectChan := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
+ catfileObjectIter := gitpipe.CatfileObject(ctx, catfileProcess, catfileInfoIter)
- if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil {
+ if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil {
return err
}
}
@@ -378,14 +378,12 @@ func (t *lfsPointerSender) Send() error {
return t.send(t.pointers)
}
-func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan gitpipe.CatfileObjectResult, limit int) error {
+func sendLFSPointers(chunker *chunk.Chunker, iter gitpipe.CatfileObjectIterator, limit int) error {
buffer := bytes.NewBuffer(make([]byte, 0, lfsPointerMaxSize))
var i int
- for lfsPointer := range lfsPointers {
- if lfsPointer.Err != nil {
- return helper.ErrInternal(lfsPointer.Err)
- }
+ for iter.Next() {
+ lfsPointer := iter.Result()
// Avoid allocating bytes for an LFS pointer until we know the current blob really
// is an LFS pointer.
@@ -419,6 +417,10 @@ func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan gitpipe.CatfileO
}
}
+ if err := iter.Err(); err != nil {
+ return helper.ErrInternal(err)
+ }
+
if err := chunker.Flush(); err != nil {
return helper.ErrInternal(err)
}