diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-28 09:50:38 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-28 10:02:14 +0300 |
commit | f9ef4d6508e5d13745664cea2e944b65e69be14d (patch) | |
tree | bd7c6e83056d2c72f0015ad82c289462abd4b708 | |
parent | 73c756d1211653b0105fe1fd45d7b4191bf95c82 (diff) |
gitpipe: Implement iterator for revlist step
The pipeline steps are currently easy to misuse: the structures returned
on the channel all have an `Err` field, which MUST be checked by the
caller. It's easy enough to forget if one isn't prepared to do so.
Refactor the revlist step to instead return a RevlistIterator. This
iterator follows the typical Go idiom of iterators with `Next()`,
`Err()` and `Result()` functions. Given that the iterator is an
interface, this also makes it easily possible for callers to replace it
with their own adapter.
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 14 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 8 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/git/gitpipe/revlist.go | 39 | ||||
-rw-r--r-- | internal/git/gitpipe/revlist_iterator.go | 51 | ||||
-rw-r--r-- | internal/git/gitpipe/revlist_test.go | 45 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 4 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 13 |
8 files changed, 123 insertions, 67 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index eacfa59e0..9778692d8 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -27,7 +27,7 @@ type CatfileInfoResult struct { // `git cat-file --batch-check`. The returned channel will contain all processed catfile info // results. Any error received via the channel or encountered in this step will cause the pipeline // to fail. Context cancellation will gracefully halt the pipeline. -func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <-chan RevlistResult) <-chan CatfileInfoResult { +func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistIterator RevlistIterator) <-chan CatfileInfoResult { resultChan := make(chan CatfileInfoResult) go func() { @@ -42,11 +42,8 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan < } } - for revlistResult := range revlistResultChan { - if revlistResult.Err != nil { - sendResult(CatfileInfoResult{Err: revlistResult.Err}) - return - } + for revlistIterator.Next() { + revlistResult := revlistIterator.Result() objectInfo, err := catfile.Info(ctx, revlistResult.OID.Revision()) if err != nil { @@ -63,6 +60,11 @@ func CatfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan < return } } + + if err := revlistIterator.Err(); err != nil { + sendResult(CatfileInfoResult{Err: err}) + return + } }() return resultChan diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index de1a3da24..80fa40dd9 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -98,13 +98,7 @@ func TestCatfileInfo(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - revlistResultChan := make(chan RevlistResult, len(tc.revlistInputs)) - for _, input := range tc.revlistInputs { - revlistResultChan <- input - } - close(revlistResultChan) - - resultChan := CatfileInfo(ctx, catfileProcess, revlistResultChan) + resultChan := CatfileInfo(ctx, catfileProcess, NewRevlistIterator(tc.revlistInputs)) var results []CatfileInfoResult for result := range resultChan { diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 2f18e3884..e76d371a8 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -171,12 +171,12 @@ func TestPipeline(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - revlistChan := Revlist(ctx, repo, tc.revisions) + revlistIter := Revlist(ctx, repo, tc.revisions) if tc.revlistFilter != nil { - revlistChan = RevlistFilter(ctx, revlistChan, tc.revlistFilter) + revlistIter = RevlistFilter(ctx, revlistIter, tc.revlistFilter) } - catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistChan) + catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter) if tc.catfileInfoFilter != nil { catfileInfoChan = CatfileInfoFilter(ctx, catfileInfoChan, tc.catfileInfoFilter) } @@ -225,9 +225,9 @@ func TestPipeline(t *testing.T) { childCtx, cancel := context.WithCancel(ctx) defer cancel() - revlistChan := Revlist(childCtx, repo, []string{"--all"}) - revlistChan = RevlistFilter(childCtx, revlistChan, func(RevlistResult) bool { return true }) - catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistChan) + revlistIter := Revlist(childCtx, repo, []string{"--all"}) + revlistIter = RevlistFilter(childCtx, revlistIter, func(RevlistResult) bool { return true }) + catfileInfoChan := CatfileInfo(childCtx, catfileProcess, revlistIter) catfileInfoChan = CatfileInfoFilter(childCtx, catfileInfoChan, func(CatfileInfoResult) bool { return true }) catfileObjectChan := CatfileObject(childCtx, catfileProcess, catfileInfoChan) @@ -260,8 +260,8 @@ func TestPipeline(t *testing.T) { catfileProcess, err := catfileCache.BatchProcess(ctx, repo) require.NoError(t, err) - revlistChan := Revlist(ctx, repo, []string{"--all"}) - catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistChan) + revlistIter := Revlist(ctx, repo, []string{"--all"}) + catfileInfoChan := CatfileInfo(ctx, catfileProcess, revlistIter) catfileObjectChan := CatfileObject(ctx, catfileProcess, catfileInfoChan) i := 0 diff --git a/internal/git/gitpipe/revlist.go b/internal/git/gitpipe/revlist.go index e768d951f..f13b3336b 100644 --- a/internal/git/gitpipe/revlist.go +++ b/internal/git/gitpipe/revlist.go @@ -12,8 +12,8 @@ import ( // RevlistResult is a result for the revlist pipeline step. type RevlistResult struct { - // Err is an error which occurred during execution of the pipeline. - Err error + // err is an error which occurred during execution of the pipeline. + err error // OID is the object ID of an object printed by git-rev-list(1). OID git.ObjectID @@ -73,7 +73,7 @@ func Revlist( repo *localrepo.Repo, revisions []string, options ...RevlistOption, -) <-chan RevlistResult { +) RevlistIterator { var cfg revlistConfig for _, option := range options { option(&cfg) @@ -115,7 +115,7 @@ func Revlist( Args: revisions, }) if err != nil { - sendResult(RevlistResult{Err: err}) + sendResult(RevlistResult{err: err}) return } @@ -142,32 +142,36 @@ func Revlist( if err := scanner.Err(); err != nil { sendResult(RevlistResult{ - Err: fmt.Errorf("scanning rev-list output: %w", err), + err: fmt.Errorf("scanning rev-list output: %w", err), }) return } if err := revlist.Wait(); err != nil { sendResult(RevlistResult{ - Err: fmt.Errorf("rev-list pipeline command: %w", err), + err: fmt.Errorf("rev-list pipeline command: %w", err), }) return } }() - return resultChan + return &revlistIterator{ + ch: resultChan, + } } -// RevlistFilter filters the revlistResults from the provided channel with the filter function: if +// RevlistFilter filters the RevlistResults from the provided iterator with the filter function: if // the filter returns `false` for a given item, then it will be dropped from the pipeline. Errors // cannot be filtered and will always be passed through. -func RevlistFilter(ctx context.Context, c <-chan RevlistResult, filter func(RevlistResult) bool) <-chan RevlistResult { +func RevlistFilter(ctx context.Context, it RevlistIterator, filter func(RevlistResult) bool) RevlistIterator { resultChan := make(chan RevlistResult) + go func() { defer close(resultChan) - for result := range c { - if result.Err != nil || filter(result) { + for it.Next() { + result := it.Result() + if filter(result) { select { case resultChan <- result: case <-ctx.Done(): @@ -175,6 +179,17 @@ func RevlistFilter(ctx context.Context, c <-chan RevlistResult, filter func(Revl } } } + + if err := it.Err(); err != nil { + select { + case resultChan <- RevlistResult{err: err}: + case <-ctx.Done(): + return + } + } }() - return resultChan + + return &revlistIterator{ + ch: resultChan, + } } diff --git a/internal/git/gitpipe/revlist_iterator.go b/internal/git/gitpipe/revlist_iterator.go new file mode 100644 index 000000000..a6e2b86fd --- /dev/null +++ b/internal/git/gitpipe/revlist_iterator.go @@ -0,0 +1,51 @@ +package gitpipe + +// RevlistIterator is an iterator returned by the Revlist function. +type RevlistIterator interface { + // Next iterates to the next item. Returns `false` in case there are no more results left. + Next() bool + // Err returns the first error that was encountered. + Err() error + // Result returns the current item. + Result() RevlistResult +} + +// NewRevlistIterator returns a new RevlistIterator for the given items. +func NewRevlistIterator(items []RevlistResult) RevlistIterator { + itemChan := make(chan RevlistResult, len(items)) + for _, item := range items { + itemChan <- item + } + close(itemChan) + + return &revlistIterator{ + ch: itemChan, + } +} + +type revlistIterator struct { + ch <-chan RevlistResult + result RevlistResult +} + +func (it *revlistIterator) Next() bool { + if it.result.err != nil { + return false + } + + var ok bool + it.result, ok = <-it.ch + if !ok || it.result.err != nil { + return false + } + + return true +} + +func (it *revlistIterator) Err() error { + return it.result.err +} + +func (it *revlistIterator) Result() RevlistResult { + return it.result +} diff --git a/internal/git/gitpipe/revlist_test.go b/internal/git/gitpipe/revlist_test.go index 2fbfb8556..d808927ea 100644 --- a/internal/git/gitpipe/revlist_test.go +++ b/internal/git/gitpipe/revlist_test.go @@ -37,6 +37,7 @@ func TestRevlist(t *testing.T) { revisions []string options []RevlistOption expectedResults []RevlistResult + expectedErr error }{ { desc: "single blob", @@ -224,9 +225,7 @@ func TestRevlist(t *testing.T) { revisions: []string{ "refs/heads/does-not-exist", }, - expectedResults: []RevlistResult{ - {Err: errors.New("rev-list pipeline command: exit status 128")}, - }, + expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, { desc: "mixed valid and invalid revision", @@ -234,9 +233,7 @@ func TestRevlist(t *testing.T) { lfsPointer1, "refs/heads/does-not-exist", }, - expectedResults: []RevlistResult{ - {Err: errors.New("rev-list pipeline command: exit status 128")}, - }, + expectedErr: errors.New("rev-list pipeline command: exit status 128"), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -247,19 +244,21 @@ func TestRevlist(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - resultChan := Revlist(ctx, repo, tc.revisions, tc.options...) + it := Revlist(ctx, repo, tc.revisions, tc.options...) var results []RevlistResult - for result := range resultChan { - // We're converting the error here to a plain un-nested error such - // that we don't have to replicate the complete error's structure. - if result.Err != nil { - result.Err = errors.New(result.Err.Error()) - } + for it.Next() { + results = append(results, it.Result()) + } - results = append(results, result) + // We're converting the error here to a plain un-nested error such that we + // don't have to replicate the complete error's structure. + err := it.Err() + if err != nil { + err = errors.New(err.Error()) } + require.Equal(t, tc.expectedErr, err) require.Equal(t, tc.expectedResults, results) }) } @@ -271,6 +270,7 @@ func TestRevlistFilter(t *testing.T) { input []RevlistResult filter func(RevlistResult) bool expectedResults []RevlistResult + expectedErr error }{ { desc: "all accepted", @@ -305,15 +305,13 @@ func TestRevlistFilter(t *testing.T) { input: []RevlistResult{ {OID: "a"}, {OID: "b"}, - {Err: errors.New("foobar")}, + {err: errors.New("foobar")}, {OID: "c"}, }, filter: func(RevlistResult) bool { return false }, - expectedResults: []RevlistResult{ - {Err: errors.New("foobar")}, - }, + expectedErr: errors.New("foobar"), }, { desc: "subset filtered", @@ -334,17 +332,14 @@ func TestRevlistFilter(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - inputChan := make(chan RevlistResult, len(tc.input)) - for _, input := range tc.input { - inputChan <- input - } - close(inputChan) + it := RevlistFilter(ctx, NewRevlistIterator(tc.input), tc.filter) var results []RevlistResult - for result := range RevlistFilter(ctx, inputChan, tc.filter) { - results = append(results, result) + for it.Next() { + results = append(results, it.Result()) } + require.Equal(t, tc.expectedErr, it.Err()) require.Equal(t, tc.expectedResults, results) }) } diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index d352da1d6..bb1f42bf3 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -64,8 +64,8 @@ func (s *server) ListBlobs(req *gitalypb.ListBlobsRequest, stream gitalypb.BlobS revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob)) } - revlistChan := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...) - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistChan) + revlistIter := gitpipe.Revlist(ctx, repo, req.GetRevisions(), revlistOptions...) + catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" }) diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 6f8e80435..451190c76 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -84,8 +84,8 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git revlistOptions = append(revlistOptions, gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob)) } - revlistChan := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...) - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistChan) + revlistIter := gitpipe.Revlist(ctx, repo, in.GetRevisions(), revlistOptions...) + catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, revlistIter) catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) @@ -195,13 +195,12 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err)) } - objectChan := make(chan gitpipe.RevlistResult, len(req.GetBlobIds())) - for _, blobID := range req.GetBlobIds() { - objectChan <- gitpipe.RevlistResult{OID: git.ObjectID(blobID)} + blobs := make([]gitpipe.RevlistResult, len(req.GetBlobIds())) + for i, blobID := range req.GetBlobIds() { + blobs[i] = gitpipe.RevlistResult{OID: git.ObjectID(blobID)} } - close(objectChan) - catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, objectChan) + catfileInfoChan := gitpipe.CatfileInfo(ctx, catfileProcess, gitpipe.NewRevlistIterator(blobs)) catfileInfoChan = gitpipe.CatfileInfoFilter(ctx, catfileInfoChan, func(r gitpipe.CatfileInfoResult) bool { return r.ObjectInfo.Type == "blob" && r.ObjectInfo.Size <= lfsPointerMaxSize }) |