diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-04 15:51:49 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-09 10:30:47 +0300 |
commit | 50ad532fcd1e7cdde65ea68b23ea58ad864da537 (patch) | |
tree | f8a6dd438e2eb9e0c08c8761bff9a2a9826947c7 | |
parent | c22a1ec007b34012f6e774fe2b772a99716314fe (diff) |
gitpipe: Propagate context cancellation in revisions pipeline
When the context gets cancelled while we're iterating over results from
the revisions pipeline, then the iterator doesn't return the context
cancellation error to the caller when calling `iter.Err()`. It is thus
easy to assume at the calling side that the iterator has just finished
successfully and that there are no more results, while in reality we
only got a partial set of results.
Fix this issue by propagating context cancellation errors to the caller.
This fixes RPCs based on this pipeline to not return `OK` when there
indeed was an error.
Changelog: fixed
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 4 | ||||
-rw-r--r-- | internal/git/gitpipe/revision.go | 6 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_iterator.go | 38 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_test.go | 14 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 2 |
5 files changed, 46 insertions, 18 deletions
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index f2226207d..8df257213 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -137,7 +137,7 @@ func TestCatfileInfo(t *testing.T) { require.NoError(t, err) defer cancel() - it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, tc.revlistInputs), tc.opts...) require.NoError(t, err) var results []CatfileInfoResult @@ -167,7 +167,7 @@ func TestCatfileInfo(t *testing.T) { require.NoError(t, err) defer objectInfoReaderCancel() - it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator([]RevisionResult{ + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, []RevisionResult{ {OID: lfsPointer1}, {OID: lfsPointer1}, })) diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index 68fcecbba..b79f7a664 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -314,7 +314,8 @@ func Revlist( }() return &revisionIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, } } @@ -443,7 +444,8 @@ func ForEachRef( }() return &revisionIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, } } diff --git a/internal/git/gitpipe/revision_iterator.go b/internal/git/gitpipe/revision_iterator.go index 8d949e648..9f686d463 100644 --- a/internal/git/gitpipe/revision_iterator.go +++ b/internal/git/gitpipe/revision_iterator.go @@ -1,6 +1,10 @@ package gitpipe -import "gitlab.com/gitlab-org/gitaly/v14/internal/git" +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) // RevisionIterator is an iterator returned by the Revlist function. type RevisionIterator interface { @@ -10,7 +14,7 @@ type RevisionIterator interface { } // NewRevisionIterator returns a new RevisionIterator for the given items. -func NewRevisionIterator(items []RevisionResult) RevisionIterator { +func NewRevisionIterator(ctx context.Context, items []RevisionResult) RevisionIterator { itemChan := make(chan RevisionResult, len(items)) for _, item := range items { itemChan <- item @@ -18,11 +22,13 @@ func NewRevisionIterator(items []RevisionResult) RevisionIterator { close(itemChan) return &revisionIterator{ - ch: itemChan, + ctx: ctx, + ch: itemChan, } } type revisionIterator struct { + ctx context.Context ch <-chan RevisionResult result RevisionResult } @@ -32,13 +38,31 @@ func (it *revisionIterator) Next() bool { return false } - var ok bool - it.result, ok = <-it.ch - if !ok || it.result.err != nil { + // Prioritize context cancellation errors so that we don't try to fetch results anymore when + // the context is done. + select { + case <-it.ctx.Done(): + it.result = RevisionResult{err: it.ctx.Err()} return false + default: } - return true + select { + case <-it.ctx.Done(): + it.result = RevisionResult{err: it.ctx.Err()} + return false + case result, ok := <-it.ch: + if !ok { + return false + } + + it.result = result + if result.err != nil { + return false + } + + return true + } } func (it *revisionIterator) Err() error { diff --git a/internal/git/gitpipe/revision_test.go b/internal/git/gitpipe/revision_test.go index c28fa39dc..031325515 100644 --- a/internal/git/gitpipe/revision_test.go +++ b/internal/git/gitpipe/revision_test.go @@ -520,9 +520,10 @@ func TestRevlist(t *testing.T) { cancel() require.False(t, it.Next()) - // This is a bug: we expect to get the cancelled context here. - require.NoError(t, it.Err()) - require.Equal(t, RevisionResult{}, it.Result()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, RevisionResult{ + err: context.Canceled, + }, it.Result()) }) } @@ -639,9 +640,10 @@ func TestForEachRef(t *testing.T) { cancel() require.False(t, it.Next()) - // This is a bug: we expect to get the cancelled context here. - require.NoError(t, it.Err()) - require.Equal(t, RevisionResult{}, it.Result()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, RevisionResult{ + err: context.Canceled, + }, it.Result()) }) } diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 57027d4c8..18b850156 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -156,7 +156,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)} } - catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs), + catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(ctx, blobs), gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), |