Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-28 09:50:38 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-28 10:02:14 +0300
commitf9ef4d6508e5d13745664cea2e944b65e69be14d (patch)
treebd7c6e83056d2c72f0015ad82c289462abd4b708
parent73c756d1211653b0105fe1fd45d7b4191bf95c82 (diff)
gitpipe: Implement iterator for revlist step
The pipeline steps are currently easy to misuse: the structures returned on the channel all have an `Err` field, which MUST be checked by the caller. It's easy enough to forget if one isn't prepared to do so. Refactor the revlist step to instead return a RevlistIterator. This iterator follows the typical Go idiom of iterators with `Next()`, `Err()` and `Result()` functions. Given that the iterator is an interface, this also makes it easily possible for callers to replace it with their own adapter.
-rw-r--r--internal/git/gitpipe/catfile_info.go14
-rw-r--r--internal/git/gitpipe/catfile_info_test.go8
-rw-r--r--internal/git/gitpipe/pipeline_test.go16
-rw-r--r--internal/git/gitpipe/revlist.go39
-rw-r--r--internal/git/gitpipe/revlist_iterator.go51
-rw-r--r--internal/git/gitpipe/revlist_test.go45
-rw-r--r--internal/gitaly/service/blob/blobs.go4
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go13
8 files changed, 123 insertions, 67 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index eacfa59e0..9778692d8 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -27,7 +27,7 @@ type CatfileInfoResult struct {
// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
// results. Any error received via the channel or encountered in this step will cause the pipeline
// to fail. Context cancellation will gracefully halt the pipeline.
-func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <-chan RevlistResult) <-chan CatfileInfoResult {
+func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) <-chan CatfileInfoResult {
resultChan := make(chan CatfileInfoResult)
go func() {
@@ -42,11 +42,8 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <
}
}
- for revlistResult := range revlistResultChan {
- if revlistResult.Err != nil {
- sendResult(CatfileInfoResult{Err: revlistResult.Err})
- return
- }
+ for revlistIterator.Next() {
+ revlistResult := revlistIterator.Result()
objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision())
if err != nil {
@@ -63,6 +60,11 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <
return
}
}
+
+ if err := revlistIterator.Err(); err != nil {
+ sendResult(CatfileInfoResult{Err: err})
+ return
+ }
}()
return resultChan
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index de1a3da24..80fa40dd9 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -98,13 +98,7 @@ func TestCatfileInfo(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- revlistResultChan := make(chan RevlistResult, len(tc.revlistInputs))
- for _, input := range tc.revlistInputs {
- revlistResultChan <- input
- }
- close(revlistResultChan)
-
- resultChan := CatfileInfo(ctx, catfileProcess, revlistResultChan)
+ resultChan := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs))
var results []CatfileInfoResult
for result := range resultChan {
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 2f18e3884..e76d371a8 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -171,12 +171,12 @@ func TestPipeline(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- revlistChan := Revlist(ctx, repo, tc.revisions)
+ revlistIter := Revlist(ctx, repo, tc.revisions)
if tc.revlistFilter != nil {
- revlistChan = RevlistFilter(ctx, revlistChan, tc.revlistFilter)
+ revlistIter = RevlistFilter(ctx, revlistIter, tc.revlistFilter)
}
- catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistChan)
+ catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter)
if tc.catfileInfoFilter != nil {
catfileInfoChan = CatfileInfoFilter(ctx, catfileInfoChan, tc.catfileInfoFilter)
}
@@ -225,9 +225,9 @@ func TestPipeline(t *testing.T) {
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
- revlistChan := Revlist(childCtx, repo, []string{"--all"})
- revlistChan = RevlistFilter(childCtx, revlistChan, func(RevlistResult) bool { return true })
- catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistChan)
+ revlistIter := Revlist(childCtx, repo, []string{"--all"})
+ revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true })
+ catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistIter)
catfileInfoChan = CatfileInfoFilter(childCtx, catfileInfoChan, func(CatfileInfoResult) bool { return true })
catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoChan)
@@ -260,8 +260,8 @@ func TestPipeline(t *testing.T) {
catfileProcess, err := catfileCache.BatchProcess(ctx, repo)
require.NoError(t, err)
- revlistChan := Revlist(ctx, repo, []string{"--all"})
- catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistChan)
+ revlistIter := Revlist(ctx, repo, []string{"--all"})
+ catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter)
catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan)
i := 0
diff --git a/internal/git/gitpipe/revlist.go b/internal/git/gitpipe/revlist.go
index e768d951f..f13b3336b 100644
--- a/internal/git/gitpipe/revlist.go
+++ b/internal/git/gitpipe/revlist.go
@@ -12,8 +12,8 @@ import (
// RevlistResult is a result for the revlist pipeline step.
type RevlistResult struct {
- // Err is an error which occurred during execution of the pipeline.
- Err error
+ // err is an error which occurred during execution of the pipeline.
+ err error
// OID is the object ID of an object printed by git-rev-list(1).
OID git.ObjectID
@@ -73,7 +73,7 @@ func Revlist(
repo *localrepo.Repo,
revisions []string,
options ...RevlistOption,
-) <-chan RevlistResult {
+) RevlistIterator {
var cfg revlistConfig
for _, option := range options {
option(&cfg)
@@ -115,7 +115,7 @@ func Revlist(
Args: revisions,
})
if err != nil {
- sendResult(RevlistResult{Err: err})
+ sendResult(RevlistResult{err: err})
return
}
@@ -142,32 +142,36 @@ func Revlist(
if err := scanner.Err(); err != nil {
sendResult(RevlistResult{
- Err: fmt.Errorf("scanning rev-list output: %w", err),
+ err: fmt.Errorf("scanning rev-list output: %w", err),
})
return
}
if err := revlist.Wait(); err != nil {
sendResult(RevlistResult{
- Err: fmt.Errorf("rev-list pipeline command: %w", err),
+ err: fmt.Errorf("rev-list pipeline command: %w", err),
})
return
}
}()
- return resultChan
+ return &revlistIterator{
+ ch: resultChan,
+ }
}
-// RevlistFilter filters the revlistResults from the provided channel with the filter function: if
+// RevlistFilter filters the RevlistResults from the provided iterator with the filter function: if
// the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors
// cannot be filtered and will always be passed through.
-func RevlistFilter(ctx context.Context, c <-chan RevlistResult, filter func(RevlistResult) bool) <-chan RevlistResult {
+func RevlistFilter(ctx context.Context, it RevlistIterator, filter func(RevlistResult) bool) RevlistIterator {
resultChan := make(chan RevlistResult)
+
go func() {
defer close(resultChan)
- for result := range c {
- if result.Err != nil || filter(result) {
+ for it.Next() {
+ result := it.Result()
+ if filter(result) {
select {
case resultChan <- result:
case <-ctx.Done():
@@ -175,6 +179,17 @@ func RevlistFilter(ctx context.Context, c <-chan RevlistResult, filter func(Revl
}
}
}
+
+ if err := it.Err(); err != nil {
+ select {
+ case resultChan <- RevlistResult{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
}()
- return resultChan
+
+ return &revlistIterator{
+ ch: resultChan,
+ }
}
diff --git a/internal/git/gitpipe/revlist_iterator.go b/internal/git/gitpipe/revlist_iterator.go
new file mode 100644
index 000000000..a6e2b86fd
--- /dev/null
+++ b/internal/git/gitpipe/revlist_iterator.go
@@ -0,0 +1,51 @@
+package gitpipe
+
+// RevlistIterator is an iterator returned by the Revlist function.
+type RevlistIterator interface {
+ // Next iterates to the next item. Returns `false` in case there are no more results left.
+ Next() bool
+ // Err returns the first error that was encountered.
+ Err() error
+ // Result returns the current item.
+ Result() RevlistResult
+}
+
+// NewRevlistIterator returns a new RevlistIterator for the given items.
+func NewRevlistIterator(items []RevlistResult) RevlistIterator {
+ itemChan := make(chan RevlistResult, len(items))
+ for _, item := range items {
+ itemChan <- item
+ }
+ close(itemChan)
+
+ return &revlistIterator{
+ ch: itemChan,
+ }
+}
+
+type revlistIterator struct {
+ ch <-chan RevlistResult
+ result RevlistResult
+}
+
+func (it *revlistIterator) Next() bool {
+ if it.result.err != nil {
+ return false
+ }
+
+ var ok bool
+ it.result, ok = <-it.ch
+ if !ok || it.result.err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (it *revlistIterator) Err() error {
+ return it.result.err
+}
+
+func (it *revlistIterator) Result() RevlistResult {
+ return it.result
+}
diff --git a/internal/git/gitpipe/revlist_test.go b/internal/git/gitpipe/revlist_test.go
index 2fbfb8556..d808927ea 100644
--- a/internal/git/gitpipe/revlist_test.go
+++ b/internal/git/gitpipe/revlist_test.go
@@ -37,6 +37,7 @@ func TestRevlist(t *testing.T) {
revisions []string
options []RevlistOption
expectedResults []RevlistResult
+ expectedErr error
}{
{
desc: "single blob",
@@ -224,9 +225,7 @@ func TestRevlist(t *testing.T) {
revisions: []string{
"refs/heads/does-not-exist",
},
- expectedResults: []RevlistResult{
- {Err: errors.New("rev-list pipeline command: exit status 128")},
- },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
{
desc: "mixed valid and invalid revision",
@@ -234,9 +233,7 @@ func TestRevlist(t *testing.T) {
lfsPointer1,
"refs/heads/does-not-exist",
},
- expectedResults: []RevlistResult{
- {Err: errors.New("rev-list pipeline command: exit status 128")},
- },
+ expectedErr: errors.New("rev-list pipeline command: exit status 128"),
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -247,19 +244,21 @@ func TestRevlist(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- resultChan := Revlist(ctx, repo, tc.revisions, tc.options...)
+ it := Revlist(ctx, repo, tc.revisions, tc.options...)
var results []RevlistResult
- for result := range resultChan {
- // We're converting the error here to a plain un-nested error such
- // that we don't have to replicate the complete error's structure.
- if result.Err != nil {
- result.Err = errors.New(result.Err.Error())
- }
+ for it.Next() {
+ results = append(results, it.Result())
+ }
- results = append(results, result)
+ // We're converting the error here to a plain un-nested error such that we
+ // don't have to replicate the complete error's structure.
+ err := it.Err()
+ if err != nil {
+ err = errors.New(err.Error())
}
+ require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedResults, results)
})
}
@@ -271,6 +270,7 @@ func TestRevlistFilter(t *testing.T) {
input []RevlistResult
filter func(RevlistResult) bool
expectedResults []RevlistResult
+ expectedErr error
}{
{
desc: "all accepted",
@@ -305,15 +305,13 @@ func TestRevlistFilter(t *testing.T) {
input: []RevlistResult{
{OID: "a"},
{OID: "b"},
- {Err: errors.New("foobar")},
+ {err: errors.New("foobar")},
{OID: "c"},
},
filter: func(RevlistResult) bool {
return false
},
- expectedResults: []RevlistResult{
- {Err: errors.New("foobar")},
- },
+ expectedErr: errors.New("foobar"),
},
{
desc: "subset filtered",
@@ -334,17 +332,14 @@ func TestRevlistFilter(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- inputChan := make(chan RevlistResult, len(tc.input))
- for _, input := range tc.input {
- inputChan <- input
- }
- close(inputChan)
+ it := RevlistFilter(ctx, NewRevlistIterator(tc.input), tc.filter)
var results []RevlistResult
- for result := range RevlistFilter(ctx, inputChan, tc.filter) {
- results = append(results, result)
+ for it.Next() {
+ results = append(results, it.Result())
}
+ require.Equal(t, tc.expectedErr, it.Err())
require.Equal(t, tc.expectedResults, results)
})
}
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index d352da1d6..bb1f42bf3 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -64,8 +64,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS
revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob))
}
- revlistChan := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistChan)
+ revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...)
+ catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob"
})
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 6f8e80435..451190c76 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -84,8 +84,8 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob))
}
- revlistChan := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistChan)
+ revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...)
+ catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter)
catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})
@@ -195,13 +195,12 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
}
- objectChan := make(chan gitpipe.RevlistResult, len(req.GetBlobIds()))
- for _, blobID := range req.GetBlobIds() {
- objectChan <- gitpipe.RevlistResult{OID: git.ObjectID(blobID)}
+ blobs := make([]gitpipe.RevlistResult, len(req.GetBlobIds()))
+ for i, blobID := range req.GetBlobIds() {
+ blobs[i] = gitpipe.RevlistResult{OID: git.ObjectID(blobID)}
}
- close(objectChan)
- catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, objectChan)
+ catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs))
catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool {
return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize
})