diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-09 11:31:02 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-09 11:31:02 +0300 |
commit | f93c64a5a41446b82850cc47d44c6960596f9e25 (patch) | |
tree | e57a82dfb683f59a9fabc0752453bf2e247dfaa0 | |
parent | 2e30abfa61112d353f2474ab41837882b78e5d1a (diff) | |
parent | 8773480fd0eb9740c96f32e992697600477a5b63 (diff) |
Merge branch 'pks-gitpipe-context-cancellation-errors' into 'master'
gitpipe: Fix propagation of context cancellation errors
Closes #4072
See merge request gitlab-org/gitaly!4524
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 56 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_iterator.go | 38 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 76 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 53 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_iterator.go | 38 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 36 | ||||
-rw-r--r-- | internal/git/gitpipe/pipeline_test.go | 2 | ||||
-rw-r--r-- | internal/git/gitpipe/revision.go | 6 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_iterator.go | 38 | ||||
-rw-r--r-- | internal/git/gitpipe/revision_test.go | 42 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 2 |
11 files changed, 300 insertions, 87 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index b756e08f3..eaa15f517 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -74,48 +74,34 @@ func CatfileInfo( var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { - select { - case requestChan <- catfileInfoRequest{err: err}: - case <-ctx.Done(): - return - } + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return } - select { - case requestChan <- catfileInfoRequest{ + if isDone := sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{ objectID: it.ObjectID(), objectName: it.ObjectName(), - }: - case <-ctx.Done(): + }); isDone { return } i++ if i%int64(cap(requestChan)) == 0 { if err := queue.Flush(); err != nil { - select { - case requestChan <- catfileInfoRequest{err: err}: - case <-ctx.Done(): - return - } + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return } } } if err := it.Err(); err != nil { - select { - case requestChan <- catfileInfoRequest{err: err}: - case <-ctx.Done(): - return - } + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return } if err := queue.Flush(); err != nil { - select { - case requestChan <- catfileInfoRequest{err: err}: - case <-ctx.Done(): - return - } + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return } }() @@ -154,7 +140,8 @@ func CatfileInfo( }() return &catfileInfoIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, }, nil } @@ -229,7 +216,8 @@ func CatfileInfoAllObjects( }() return &catfileInfoIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, } } @@ -252,3 +240,19 @@ func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, res return true } } + +func sendCatfileInfoRequest(ctx context.Context, ch chan<- catfileInfoRequest, request catfileInfoRequest) bool { + // Please refer to `sendCatfileInfoResult()` for why we treat the context specially. + select { + case <-ctx.Done(): + return true + default: + } + + select { + case ch <- request: + return false + case <-ctx.Done(): + return true + } +} diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go index 35c782699..c4403af5c 100644 --- a/internal/git/gitpipe/catfile_info_iterator.go +++ b/internal/git/gitpipe/catfile_info_iterator.go @@ -1,6 +1,10 @@ package gitpipe -import "gitlab.com/gitlab-org/gitaly/v14/internal/git" +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) // CatfileInfoIterator is an iterator returned by the Revlist function. type CatfileInfoIterator interface { @@ -10,7 +14,7 @@ type CatfileInfoIterator interface { } // NewCatfileInfoIterator returns a new CatfileInfoIterator for the given items. -func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator { +func NewCatfileInfoIterator(ctx context.Context, items []CatfileInfoResult) CatfileInfoIterator { itemChan := make(chan CatfileInfoResult, len(items)) for _, item := range items { itemChan <- item @@ -18,11 +22,13 @@ func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator { close(itemChan) return &catfileInfoIterator{ - ch: itemChan, + ctx: ctx, + ch: itemChan, } } type catfileInfoIterator struct { + ctx context.Context ch <-chan CatfileInfoResult result CatfileInfoResult } @@ -32,13 +38,31 @@ func (it *catfileInfoIterator) Next() bool { return false } - var ok bool - it.result, ok = <-it.ch - if !ok || it.result.err != nil { + // Prioritize context cancellation errors so that we don't try to fetch results anymore when + // the context is done. + select { + case <-it.ctx.Done(): + it.result = CatfileInfoResult{err: it.ctx.Err()} return false + default: } - return true + select { + case <-it.ctx.Done(): + it.result = CatfileInfoResult{err: it.ctx.Err()} + return false + case result, ok := <-it.ch: + if !ok { + return false + } + + it.result = result + if result.err != nil { + return false + } + + return true + } } func (it *catfileInfoIterator) Err() error { diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 80a1810d9..d2f7c2b37 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -1,6 +1,7 @@ package gitpipe import ( + "context" "errors" "testing" @@ -136,7 +137,7 @@ func TestCatfileInfo(t *testing.T) { require.NoError(t, err) defer cancel() - it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, tc.revlistInputs), tc.opts...) require.NoError(t, err) var results []CatfileInfoResult @@ -155,6 +156,37 @@ func TestCatfileInfo(t *testing.T) { require.Equal(t, tc.expectedResults, results) }) } + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer objectInfoReaderCancel() + + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, []RevisionResult{ + {OID: lfsPointer1}, + {OID: lfsPointer1}, + })) + require.NoError(t, err) + + require.True(t, it.Next()) + require.NoError(t, it.Err()) + require.Equal(t, CatfileInfoResult{ + ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}, + }, it.Result()) + + cancel() + + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, CatfileInfoResult{ + err: context.Canceled, + }, it.Result()) + }) } func TestCatfileInfoAllObjects(t *testing.T) { @@ -171,18 +203,40 @@ func TestCatfileInfoAllObjects(t *testing.T) { }) commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) - it := CatfileInfoAllObjects(ctx, repo) - - var results []CatfileInfoResult - for it.Next() { - results = append(results, it.Result()) - } - require.NoError(t, it.Err()) - - require.ElementsMatch(t, []CatfileInfoResult{ + actualObjects := []CatfileInfoResult{ {ObjectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}}, {ObjectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}}, {ObjectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}}, {ObjectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}}, - }, results) + } + + t.Run("successful", func(t *testing.T) { + it := CatfileInfoAllObjects(ctx, repo) + + var results []CatfileInfoResult + for it.Next() { + results = append(results, it.Result()) + } + require.NoError(t, it.Err()) + + require.ElementsMatch(t, actualObjects, results) + }) + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + + it := CatfileInfoAllObjects(ctx, repo) + + require.True(t, it.Next()) + require.NoError(t, it.Err()) + require.Contains(t, actualObjects, it.Result()) + + cancel() + + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, CatfileInfoResult{ + err: context.Canceled, + }, it.Result()) + }) } diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 49e33b859..f23496158 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -48,48 +48,52 @@ func CatfileObject( go func() { defer close(requestChan) + sendRequest := func(request catfileObjectRequest) bool { + // Please refer to `sendResult()` for why we treat the context specially. + select { + case <-ctx.Done(): + return true + default: + } + + select { + case requestChan <- request: + return false + case <-ctx.Done(): + return true + } + } + var i int64 for it.Next() { if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { - select { - case requestChan <- catfileObjectRequest{err: err}: - case <-ctx.Done(): - return - } + sendRequest(catfileObjectRequest{err: err}) + return } - select { - case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}: - case <-ctx.Done(): + if isDone := sendRequest(catfileObjectRequest{ + objectName: it.ObjectName(), + }); isDone { return } i++ if i%int64(cap(requestChan)) == 0 { if err := queue.Flush(); err != nil { - select { - case requestChan <- catfileObjectRequest{err: err}: - case <-ctx.Done(): - return - } + sendRequest(catfileObjectRequest{err: err}) + return } } } if err := it.Err(); err != nil { - select { - case requestChan <- catfileObjectRequest{err: err}: - case <-ctx.Done(): - return - } + sendRequest(catfileObjectRequest{err: err}) + return } if err := queue.Flush(); err != nil { - select { - case requestChan <- catfileObjectRequest{err: err}: - case <-ctx.Done(): - return - } + sendRequest(catfileObjectRequest{err: err}) + return } }() @@ -164,7 +168,8 @@ func CatfileObject( }() return &catfileObjectIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, }, nil } diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go index c457088e0..05b00e3e8 100644 --- a/internal/git/gitpipe/catfile_object_iterator.go +++ b/internal/git/gitpipe/catfile_object_iterator.go @@ -1,6 +1,10 @@ package gitpipe -import "gitlab.com/gitlab-org/gitaly/v14/internal/git" +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) // CatfileObjectIterator is an iterator returned by the Revlist function. type CatfileObjectIterator interface { @@ -10,7 +14,7 @@ type CatfileObjectIterator interface { } // NewCatfileObjectIterator returns a new CatfileObjectIterator for the given items. -func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator { +func NewCatfileObjectIterator(ctx context.Context, items []CatfileObjectResult) CatfileObjectIterator { itemChan := make(chan CatfileObjectResult, len(items)) for _, item := range items { itemChan <- item @@ -18,11 +22,13 @@ func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator close(itemChan) return &catfileObjectIterator{ - ch: itemChan, + ctx: ctx, + ch: itemChan, } } type catfileObjectIterator struct { + ctx context.Context ch <-chan CatfileObjectResult result CatfileObjectResult } @@ -32,13 +38,31 @@ func (it *catfileObjectIterator) Next() bool { return false } - var ok bool - it.result, ok = <-it.ch - if !ok || it.result.err != nil { + // Prioritize context cancellation errors so that we don't try to fetch results anymore when + // the context is done. + select { + case <-it.ctx.Done(): + it.result = CatfileObjectResult{err: it.ctx.Err()} return false + default: } - return true + select { + case <-it.ctx.Done(): + it.result = CatfileObjectResult{err: it.ctx.Err()} + return false + case result, ok := <-it.ch: + if !ok { + return false + } + + it.result = result + if result.err != nil { + return false + } + + return true + } } func (it *catfileObjectIterator) Err() error { diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 95a9b3ad6..bf96a19e0 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -1,11 +1,13 @@ package gitpipe import ( + "context" "errors" "io" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" @@ -78,7 +80,7 @@ func TestCatfileObject(t *testing.T) { require.NoError(t, err) defer cancel() - it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) + it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(ctx, tc.catfileInfoInputs)) require.NoError(t, err) var results []CatfileObjectResult @@ -115,4 +117,36 @@ func TestCatfileObject(t *testing.T) { require.Equal(t, tc.expectedResults, results) }) } + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer objectReaderCancel() + + it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(ctx, []CatfileInfoResult{ + {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}}, + {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}}, + })) + require.NoError(t, err) + + require.True(t, it.Next()) + require.NoError(t, it.Err()) + require.Equal(t, git.ObjectID(lfsPointer1), it.Result().ObjectID()) + + _, err = io.Copy(io.Discard, it.Result()) + require.NoError(t, err) + + cancel() + + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, CatfileObjectResult{ + err: context.Canceled, + }, it.Result()) + }) } diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 544ad552d..3d34db488 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -310,7 +310,7 @@ func TestPipeline_revlist(t *testing.T) { } } - require.NoError(t, catfileObjectIter.Err()) + require.Equal(t, context.Canceled, catfileObjectIter.Err()) // Context cancellation is timing sensitive: at the point of cancelling the context, // the last pipeline step may already have queued up an additional result. We thus diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index 68fcecbba..b79f7a664 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -314,7 +314,8 @@ func Revlist( }() return &revisionIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, } } @@ -443,7 +444,8 @@ func ForEachRef( }() return &revisionIterator{ - ch: resultChan, + ctx: ctx, + ch: resultChan, } } diff --git a/internal/git/gitpipe/revision_iterator.go b/internal/git/gitpipe/revision_iterator.go index 8d949e648..9f686d463 100644 --- a/internal/git/gitpipe/revision_iterator.go +++ b/internal/git/gitpipe/revision_iterator.go @@ -1,6 +1,10 @@ package gitpipe -import "gitlab.com/gitlab-org/gitaly/v14/internal/git" +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) // RevisionIterator is an iterator returned by the Revlist function. type RevisionIterator interface { @@ -10,7 +14,7 @@ type RevisionIterator interface { } // NewRevisionIterator returns a new RevisionIterator for the given items. -func NewRevisionIterator(items []RevisionResult) RevisionIterator { +func NewRevisionIterator(ctx context.Context, items []RevisionResult) RevisionIterator { itemChan := make(chan RevisionResult, len(items)) for _, item := range items { itemChan <- item @@ -18,11 +22,13 @@ func NewRevisionIterator(items []RevisionResult) RevisionIterator { close(itemChan) return &revisionIterator{ - ch: itemChan, + ctx: ctx, + ch: itemChan, } } type revisionIterator struct { + ctx context.Context ch <-chan RevisionResult result RevisionResult } @@ -32,13 +38,31 @@ func (it *revisionIterator) Next() bool { return false } - var ok bool - it.result, ok = <-it.ch - if !ok || it.result.err != nil { + // Prioritize context cancellation errors so that we don't try to fetch results anymore when + // the context is done. + select { + case <-it.ctx.Done(): + it.result = RevisionResult{err: it.ctx.Err()} return false + default: } - return true + select { + case <-it.ctx.Done(): + it.result = RevisionResult{err: it.ctx.Err()} + return false + case result, ok := <-it.ch: + if !ok { + return false + } + + it.result = result + if result.err != nil { + return false + } + + return true + } } func (it *revisionIterator) Err() error { diff --git a/internal/git/gitpipe/revision_test.go b/internal/git/gitpipe/revision_test.go index 091df9e3f..031325515 100644 --- a/internal/git/gitpipe/revision_test.go +++ b/internal/git/gitpipe/revision_test.go @@ -1,6 +1,7 @@ package gitpipe import ( + "context" "errors" "testing" "time" @@ -504,6 +505,26 @@ func TestRevlist(t *testing.T) { require.Equal(t, tc.expectedResults, results) }) } + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + + it := Revlist(ctx, repo, []string{"refs/heads/master"}) + + require.True(t, it.Next()) + require.NoError(t, it.Err()) + require.Equal(t, RevisionResult{ + OID: "1e292f8fedd741b75372e19097c76d327140c312", + }, it.Result()) + + cancel() + + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, RevisionResult{ + err: context.Canceled, + }, it.Result()) + }) } func TestForEachRef(t *testing.T) { @@ -603,6 +624,27 @@ func TestForEachRef(t *testing.T) { t.Run("nonexisting pattern", func(t *testing.T) { require.Nil(t, readRefs(t, repo, []string{"refs/idontexist/*"})) }) + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + + it := ForEachRef(ctx, repo, []string{"refs/heads/*"}) + + require.True(t, it.Next()) + require.NoError(t, it.Err()) + require.Equal(t, RevisionResult{ + OID: "e56497bb5f03a90a51293fc6d516788730953899", + ObjectName: []byte("refs/heads/'test'"), + }, it.Result()) + + cancel() + + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + require.Equal(t, RevisionResult{ + err: context.Canceled, + }, it.Result()) + }) } func TestForEachRef_options(t *testing.T) { diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 57027d4c8..18b850156 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -156,7 +156,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)} } - catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs), + catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(ctx, blobs), gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), |