Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-09 11:31:02 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-09 11:31:02 +0300
commitf93c64a5a41446b82850cc47d44c6960596f9e25 (patch)
treee57a82dfb683f59a9fabc0752453bf2e247dfaa0
parent2e30abfa61112d353f2474ab41837882b78e5d1a (diff)
parent8773480fd0eb9740c96f32e992697600477a5b63 (diff)
Merge branch 'pks-gitpipe-context-cancellation-errors' into 'master'
gitpipe: Fix propagation of context cancellation errors Closes #4072 See merge request gitlab-org/gitaly!4524
-rw-r--r--internal/git/gitpipe/catfile_info.go56
-rw-r--r--internal/git/gitpipe/catfile_info_iterator.go38
-rw-r--r--internal/git/gitpipe/catfile_info_test.go76
-rw-r--r--internal/git/gitpipe/catfile_object.go53
-rw-r--r--internal/git/gitpipe/catfile_object_iterator.go38
-rw-r--r--internal/git/gitpipe/catfile_object_test.go36
-rw-r--r--internal/git/gitpipe/pipeline_test.go2
-rw-r--r--internal/git/gitpipe/revision.go6
-rw-r--r--internal/git/gitpipe/revision_iterator.go38
-rw-r--r--internal/git/gitpipe/revision_test.go42
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go2
11 files changed, 300 insertions, 87 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index b756e08f3..eaa15f517 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -74,48 +74,34 @@ func CatfileInfo(
var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
- select {
- case requestChan <- catfileInfoRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
}
- select {
- case requestChan <- catfileInfoRequest{
+ if isDone := sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{
objectID: it.ObjectID(),
objectName: it.ObjectName(),
- }:
- case <-ctx.Done():
+ }); isDone {
return
}
i++
if i%int64(cap(requestChan)) == 0 {
if err := queue.Flush(); err != nil {
- select {
- case requestChan <- catfileInfoRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
}
}
}
if err := it.Err(); err != nil {
- select {
- case requestChan <- catfileInfoRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
}
if err := queue.Flush(); err != nil {
- select {
- case requestChan <- catfileInfoRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
}
}()
@@ -154,7 +140,8 @@ func CatfileInfo(
}()
return &catfileInfoIterator{
- ch: resultChan,
+ ctx: ctx,
+ ch: resultChan,
}, nil
}
@@ -229,7 +216,8 @@ func CatfileInfoAllObjects(
}()
return &catfileInfoIterator{
- ch: resultChan,
+ ctx: ctx,
+ ch: resultChan,
}
}
@@ -252,3 +240,19 @@ func sendCatfileInfoResult(ctx context.Context, ch chan<- CatfileInfoResult, res
return true
}
}
+
+func sendCatfileInfoRequest(ctx context.Context, ch chan<- catfileInfoRequest, request catfileInfoRequest) bool {
+ // Please refer to `sendCatfileInfoResult()` for why we treat the context specially.
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ }
+
+ select {
+ case ch <- request:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+}
diff --git a/internal/git/gitpipe/catfile_info_iterator.go b/internal/git/gitpipe/catfile_info_iterator.go
index 35c782699..c4403af5c 100644
--- a/internal/git/gitpipe/catfile_info_iterator.go
+++ b/internal/git/gitpipe/catfile_info_iterator.go
@@ -1,6 +1,10 @@
package gitpipe
-import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
// CatfileInfoIterator is an iterator returned by the Revlist function.
type CatfileInfoIterator interface {
@@ -10,7 +14,7 @@ type CatfileInfoIterator interface {
}
// NewCatfileInfoIterator returns a new CatfileInfoIterator for the given items.
-func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator {
+func NewCatfileInfoIterator(ctx context.Context, items []CatfileInfoResult) CatfileInfoIterator {
itemChan := make(chan CatfileInfoResult, len(items))
for _, item := range items {
itemChan <- item
@@ -18,11 +22,13 @@ func NewCatfileInfoIterator(items []CatfileInfoResult) CatfileInfoIterator {
close(itemChan)
return &catfileInfoIterator{
- ch: itemChan,
+ ctx: ctx,
+ ch: itemChan,
}
}
type catfileInfoIterator struct {
+ ctx context.Context
ch <-chan CatfileInfoResult
result CatfileInfoResult
}
@@ -32,13 +38,31 @@ func (it *catfileInfoIterator) Next() bool {
return false
}
- var ok bool
- it.result, ok = <-it.ch
- if !ok || it.result.err != nil {
+ // Prioritize context cancellation errors so that we don't try to fetch results anymore when
+ // the context is done.
+ select {
+ case <-it.ctx.Done():
+ it.result = CatfileInfoResult{err: it.ctx.Err()}
return false
+ default:
}
- return true
+ select {
+ case <-it.ctx.Done():
+ it.result = CatfileInfoResult{err: it.ctx.Err()}
+ return false
+ case result, ok := <-it.ch:
+ if !ok {
+ return false
+ }
+
+ it.result = result
+ if result.err != nil {
+ return false
+ }
+
+ return true
+ }
}
func (it *catfileInfoIterator) Err() error {
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 80a1810d9..d2f7c2b37 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -1,6 +1,7 @@
package gitpipe
import (
+ "context"
"errors"
"testing"
@@ -136,7 +137,7 @@ func TestCatfileInfo(t *testing.T) {
require.NoError(t, err)
defer cancel()
- it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
+ it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, tc.revlistInputs), tc.opts...)
require.NoError(t, err)
var results []CatfileInfoResult
@@ -155,6 +156,37 @@ func TestCatfileInfo(t *testing.T) {
require.Equal(t, tc.expectedResults, results)
})
}
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectInfoReaderCancel()
+
+ it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, []RevisionResult{
+ {OID: lfsPointer1},
+ {OID: lfsPointer1},
+ }))
+ require.NoError(t, err)
+
+ require.True(t, it.Next())
+ require.NoError(t, it.Err())
+ require.Equal(t, CatfileInfoResult{
+ ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133},
+ }, it.Result())
+
+ cancel()
+
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ require.Equal(t, CatfileInfoResult{
+ err: context.Canceled,
+ }, it.Result())
+ })
}
func TestCatfileInfoAllObjects(t *testing.T) {
@@ -171,18 +203,40 @@ func TestCatfileInfoAllObjects(t *testing.T) {
})
commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
- it := CatfileInfoAllObjects(ctx, repo)
-
- var results []CatfileInfoResult
- for it.Next() {
- results = append(results, it.Result())
- }
- require.NoError(t, it.Err())
-
- require.ElementsMatch(t, []CatfileInfoResult{
+ actualObjects := []CatfileInfoResult{
{ObjectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
{ObjectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}},
{ObjectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}},
{ObjectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
- }, results)
+ }
+
+ t.Run("successful", func(t *testing.T) {
+ it := CatfileInfoAllObjects(ctx, repo)
+
+ var results []CatfileInfoResult
+ for it.Next() {
+ results = append(results, it.Result())
+ }
+ require.NoError(t, it.Err())
+
+ require.ElementsMatch(t, actualObjects, results)
+ })
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+
+ it := CatfileInfoAllObjects(ctx, repo)
+
+ require.True(t, it.Next())
+ require.NoError(t, it.Err())
+ require.Contains(t, actualObjects, it.Result())
+
+ cancel()
+
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ require.Equal(t, CatfileInfoResult{
+ err: context.Canceled,
+ }, it.Result())
+ })
}
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 49e33b859..f23496158 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -48,48 +48,52 @@ func CatfileObject(
go func() {
defer close(requestChan)
+ sendRequest := func(request catfileObjectRequest) bool {
+ // Please refer to `sendResult()` for why we treat the context specially.
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ }
+
+ select {
+ case requestChan <- request:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
var i int64
for it.Next() {
if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
- select {
- case requestChan <- catfileObjectRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendRequest(catfileObjectRequest{err: err})
+ return
}
- select {
- case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}:
- case <-ctx.Done():
+ if isDone := sendRequest(catfileObjectRequest{
+ objectName: it.ObjectName(),
+ }); isDone {
return
}
i++
if i%int64(cap(requestChan)) == 0 {
if err := queue.Flush(); err != nil {
- select {
- case requestChan <- catfileObjectRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendRequest(catfileObjectRequest{err: err})
+ return
}
}
}
if err := it.Err(); err != nil {
- select {
- case requestChan <- catfileObjectRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendRequest(catfileObjectRequest{err: err})
+ return
}
if err := queue.Flush(); err != nil {
- select {
- case requestChan <- catfileObjectRequest{err: err}:
- case <-ctx.Done():
- return
- }
+ sendRequest(catfileObjectRequest{err: err})
+ return
}
}()
@@ -164,7 +168,8 @@ func CatfileObject(
}()
return &catfileObjectIterator{
- ch: resultChan,
+ ctx: ctx,
+ ch: resultChan,
}, nil
}
diff --git a/internal/git/gitpipe/catfile_object_iterator.go b/internal/git/gitpipe/catfile_object_iterator.go
index c457088e0..05b00e3e8 100644
--- a/internal/git/gitpipe/catfile_object_iterator.go
+++ b/internal/git/gitpipe/catfile_object_iterator.go
@@ -1,6 +1,10 @@
package gitpipe
-import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
// CatfileObjectIterator is an iterator returned by the Revlist function.
type CatfileObjectIterator interface {
@@ -10,7 +14,7 @@ type CatfileObjectIterator interface {
}
// NewCatfileObjectIterator returns a new CatfileObjectIterator for the given items.
-func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator {
+func NewCatfileObjectIterator(ctx context.Context, items []CatfileObjectResult) CatfileObjectIterator {
itemChan := make(chan CatfileObjectResult, len(items))
for _, item := range items {
itemChan <- item
@@ -18,11 +22,13 @@ func NewCatfileObjectIterator(items []CatfileObjectResult) CatfileObjectIterator
close(itemChan)
return &catfileObjectIterator{
- ch: itemChan,
+ ctx: ctx,
+ ch: itemChan,
}
}
type catfileObjectIterator struct {
+ ctx context.Context
ch <-chan CatfileObjectResult
result CatfileObjectResult
}
@@ -32,13 +38,31 @@ func (it *catfileObjectIterator) Next() bool {
return false
}
- var ok bool
- it.result, ok = <-it.ch
- if !ok || it.result.err != nil {
+ // Prioritize context cancellation errors so that we don't try to fetch results anymore when
+ // the context is done.
+ select {
+ case <-it.ctx.Done():
+ it.result = CatfileObjectResult{err: it.ctx.Err()}
return false
+ default:
}
- return true
+ select {
+ case <-it.ctx.Done():
+ it.result = CatfileObjectResult{err: it.ctx.Err()}
+ return false
+ case result, ok := <-it.ch:
+ if !ok {
+ return false
+ }
+
+ it.result = result
+ if result.err != nil {
+ return false
+ }
+
+ return true
+ }
}
func (it *catfileObjectIterator) Err() error {
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 95a9b3ad6..bf96a19e0 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -1,11 +1,13 @@
package gitpipe
import (
+ "context"
"errors"
"io"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
@@ -78,7 +80,7 @@ func TestCatfileObject(t *testing.T) {
require.NoError(t, err)
defer cancel()
- it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(ctx, tc.catfileInfoInputs))
require.NoError(t, err)
var results []CatfileObjectResult
@@ -115,4 +117,36 @@ func TestCatfileObject(t *testing.T) {
require.Equal(t, tc.expectedResults, results)
})
}
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectReaderCancel()
+
+ it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(ctx, []CatfileInfoResult{
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer1, Type: "blob", Size: 133}},
+ {ObjectInfo: &catfile.ObjectInfo{Oid: lfsPointer2, Type: "blob", Size: 127}},
+ }))
+ require.NoError(t, err)
+
+ require.True(t, it.Next())
+ require.NoError(t, it.Err())
+ require.Equal(t, git.ObjectID(lfsPointer1), it.Result().ObjectID())
+
+ _, err = io.Copy(io.Discard, it.Result())
+ require.NoError(t, err)
+
+ cancel()
+
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ require.Equal(t, CatfileObjectResult{
+ err: context.Canceled,
+ }, it.Result())
+ })
}
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 544ad552d..3d34db488 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -310,7 +310,7 @@ func TestPipeline_revlist(t *testing.T) {
}
}
- require.NoError(t, catfileObjectIter.Err())
+ require.Equal(t, context.Canceled, catfileObjectIter.Err())
// Context cancellation is timing sensitive: at the point of cancelling the context,
// the last pipeline step may already have queued up an additional result. We thus
diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go
index 68fcecbba..b79f7a664 100644
--- a/internal/git/gitpipe/revision.go
+++ b/internal/git/gitpipe/revision.go
@@ -314,7 +314,8 @@ func Revlist(
}()
return &revisionIterator{
- ch: resultChan,
+ ctx: ctx,
+ ch: resultChan,
}
}
@@ -443,7 +444,8 @@ func ForEachRef(
}()
return &revisionIterator{
- ch: resultChan,
+ ctx: ctx,
+ ch: resultChan,
}
}
diff --git a/internal/git/gitpipe/revision_iterator.go b/internal/git/gitpipe/revision_iterator.go
index 8d949e648..9f686d463 100644
--- a/internal/git/gitpipe/revision_iterator.go
+++ b/internal/git/gitpipe/revision_iterator.go
@@ -1,6 +1,10 @@
package gitpipe
-import "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
// RevisionIterator is an iterator returned by the Revlist function.
type RevisionIterator interface {
@@ -10,7 +14,7 @@ type RevisionIterator interface {
}
// NewRevisionIterator returns a new RevisionIterator for the given items.
-func NewRevisionIterator(items []RevisionResult) RevisionIterator {
+func NewRevisionIterator(ctx context.Context, items []RevisionResult) RevisionIterator {
itemChan := make(chan RevisionResult, len(items))
for _, item := range items {
itemChan <- item
@@ -18,11 +22,13 @@ func NewRevisionIterator(items []RevisionResult) RevisionIterator {
close(itemChan)
return &revisionIterator{
- ch: itemChan,
+ ctx: ctx,
+ ch: itemChan,
}
}
type revisionIterator struct {
+ ctx context.Context
ch <-chan RevisionResult
result RevisionResult
}
@@ -32,13 +38,31 @@ func (it *revisionIterator) Next() bool {
return false
}
- var ok bool
- it.result, ok = <-it.ch
- if !ok || it.result.err != nil {
+ // Prioritize context cancellation errors so that we don't try to fetch results anymore when
+ // the context is done.
+ select {
+ case <-it.ctx.Done():
+ it.result = RevisionResult{err: it.ctx.Err()}
return false
+ default:
}
- return true
+ select {
+ case <-it.ctx.Done():
+ it.result = RevisionResult{err: it.ctx.Err()}
+ return false
+ case result, ok := <-it.ch:
+ if !ok {
+ return false
+ }
+
+ it.result = result
+ if result.err != nil {
+ return false
+ }
+
+ return true
+ }
}
func (it *revisionIterator) Err() error {
diff --git a/internal/git/gitpipe/revision_test.go b/internal/git/gitpipe/revision_test.go
index 091df9e3f..031325515 100644
--- a/internal/git/gitpipe/revision_test.go
+++ b/internal/git/gitpipe/revision_test.go
@@ -1,6 +1,7 @@
package gitpipe
import (
+ "context"
"errors"
"testing"
"time"
@@ -504,6 +505,26 @@ func TestRevlist(t *testing.T) {
require.Equal(t, tc.expectedResults, results)
})
}
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+
+ it := Revlist(ctx, repo, []string{"refs/heads/master"})
+
+ require.True(t, it.Next())
+ require.NoError(t, it.Err())
+ require.Equal(t, RevisionResult{
+ OID: "1e292f8fedd741b75372e19097c76d327140c312",
+ }, it.Result())
+
+ cancel()
+
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ require.Equal(t, RevisionResult{
+ err: context.Canceled,
+ }, it.Result())
+ })
}
func TestForEachRef(t *testing.T) {
@@ -603,6 +624,27 @@ func TestForEachRef(t *testing.T) {
t.Run("nonexisting pattern", func(t *testing.T) {
require.Nil(t, readRefs(t, repo, []string{"refs/idontexist/*"}))
})
+
+ t.Run("context cancellation", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+
+ it := ForEachRef(ctx, repo, []string{"refs/heads/*"})
+
+ require.True(t, it.Next())
+ require.NoError(t, it.Err())
+ require.Equal(t, RevisionResult{
+ OID: "e56497bb5f03a90a51293fc6d516788730953899",
+ ObjectName: []byte("refs/heads/'test'"),
+ }, it.Result())
+
+ cancel()
+
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ require.Equal(t, RevisionResult{
+ err: context.Canceled,
+ }, it.Result())
+ })
}
func TestForEachRef_options(t *testing.T) {
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 57027d4c8..18b850156 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -156,7 +156,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)}
}
- catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs),
+ catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(ctx, blobs),
gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
}),