diff options
author | John Cai <jcai@gitlab.com> | 2022-05-24 16:52:50 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-05-24 16:52:50 +0300 |
commit | 5aa9d4d29c49ebe427a4a895158e195725cda2da (patch) | |
tree | 26a482dabd1fc0a5d35d4cc89c975164f7e21b10 | |
parent | b2c8eaa672c9f2dc4b55477a3876f957e2c9a768 (diff) | |
parent | 3672650574465fbd42677b5f68d35f1beb555dc0 (diff) |
Merge branch 'pks-catfile-context-cancellation-unflushed-requests-deadlock' into 'master'
gitpipe: Fix deadlock on context cancellation with unflushed requests
Closes #4253
See merge request gitlab-org/gitaly!4581
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 28 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 85 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 28 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 86 | ||||
-rw-r--r-- | internal/git/gitpipe/testhelper_test.go | 40 |
5 files changed, 259 insertions, 8 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 9f821f5ae..cefd81422 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" @@ -61,15 +62,20 @@ func CatfileInfo( opt(&cfg) } - queue, cleanup, err := objectInfoReader.InfoQueue(ctx) + queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx) if err != nil { return nil, err } - defer cleanup() + var queueRefcount int32 = 2 requestChan := make(chan catfileInfoRequest, 32) go func() { - defer close(requestChan) + defer func() { + close(requestChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() var i int64 for it.Next() { @@ -82,6 +88,15 @@ func CatfileInfo( objectID: it.ObjectID(), objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return + } + + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()}) return } @@ -107,7 +122,12 @@ func CatfileInfo( resultChan := make(chan CatfileInfoResult) go func() { - defer close(resultChan) + defer func() { + close(resultChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() // It's fine to iterate over the request channel without paying attention to // context cancellation because the request channel itself would be closed if the diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 6651dbac3..93274718e 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -3,14 +3,17 @@ package gitpipe import ( "context" "errors" + "fmt" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) const ( @@ -187,6 +190,88 @@ func TestCatfileInfo(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer objectInfoReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileInfo(ctx, objectInfoReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Verify whether we can receive any more objects via the iterator. This should + // fail because the context got cancelled, but in any case it shouldn't block. Note + // that we're forced to reach into the channel directly: `Next()` would return + // `false` immediately because the context is cancelled. + _, ok := <-it.(*catfileInfoIterator).ch + require.False(t, ok) + + // Sanity-check whether the iterator is in the expected state. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) + + t.Run("spawning two pipes fails", func(t *testing.T) { + ctx := testhelper.Context(t) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer cancel() + + input := []RevisionResult{ + {OID: lfsPointer1}, + } + + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + + // Reusing the queue is not allowed, so we should get an error here. + _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.Equal(t, fmt.Errorf("object info queue already in use"), err) + + // We now consume all the input of the iterator. + require.True(t, it.Next()) + require.False(t, it.Next()) + + // Which means that the queue should now be unused, so we can again use it. + _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + }) } func TestCatfileInfoAllObjects(t *testing.T) { diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index b00d4983e..5fe031579 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" @@ -38,15 +39,20 @@ func CatfileObject( objectReader catfile.ObjectReader, it ObjectIterator, ) (CatfileObjectIterator, error) { - queue, cleanup, err := objectReader.ObjectQueue(ctx) + queue, queueCleanup, err := objectReader.ObjectQueue(ctx) if err != nil { return nil, err } - defer cleanup() + var queueRefcount int32 = 2 requestChan := make(chan catfileObjectRequest, 32) go func() { - defer close(requestChan) + defer func() { + close(requestChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() sendRequest := func(request catfileObjectRequest) bool { // Please refer to `sendResult()` for why we treat the context specially. @@ -74,6 +80,15 @@ func CatfileObject( if isDone := sendRequest(catfileObjectRequest{ objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendRequest(catfileObjectRequest{err: err}) + return + } + + sendRequest(catfileObjectRequest{err: ctx.Err()}) return } @@ -99,7 +114,12 @@ func CatfileObject( resultChan := make(chan CatfileObjectResult) go func() { - defer close(resultChan) + defer func() { + close(resultChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() sendResult := func(result CatfileObjectResult) bool { // In case the context has been cancelled, we have a race between observing diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 5e40cf444..5b6121eac 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -3,6 +3,7 @@ package gitpipe import ( "context" "errors" + "fmt" "io" "testing" @@ -13,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) func TestCatfileObject(t *testing.T) { @@ -149,4 +151,88 @@ func TestCatfileObject(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer objectReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileObject(ctx, objectReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Verify whether we can receive any more objects via the iterator. This should + // fail because the context got cancelled, but in any case it shouldn't block. Note + // that we're forced to reach into the channel directly: `Next()` would return + // `false` immediately because the context is cancelled. + _, ok := <-it.(*catfileObjectIterator).ch + require.False(t, ok) + + // Sanity-check whether the iterator is in the expected state. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) + + t.Run("spawning two pipes fails", func(t *testing.T) { + ctx := testhelper.Context(t) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer cancel() + + input := []RevisionResult{ + {OID: lfsPointer1}, + } + + it, err := CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + + // Reusing the queue is not allowed, so we should get an error here. + _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.Equal(t, fmt.Errorf("object queue already in use"), err) + + // We now consume all the input of the iterator. + require.True(t, it.Next()) + _, err = io.Copy(io.Discard, it.Result()) + require.NoError(t, err) + require.False(t, it.Next()) + + // Which means that the queue should now be unused, so we can again use it. + _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + }) } diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go index 4d715bb7d..b82eb4d82 100644 --- a/internal/git/gitpipe/testhelper_test.go +++ b/internal/git/gitpipe/testhelper_test.go @@ -3,9 +3,49 @@ package gitpipe import ( "testing" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" ) func TestMain(m *testing.M) { testhelper.Run(m) } + +// chanObjectIterator is an object iterator that can be driven via a set of channels for +// deterministically exercising specific conditions in tests. +type chanObjectIterator struct { + ObjectIterator + + oid git.ObjectID + oidChan <-chan git.ObjectID + nextChan chan<- interface{} +} + +// newChanObjectIterator returns a new object iterator as well as two channels: one object ID +// channel that can be used to inject the next value returned by `Next()`. And then a second value +// that is written to when `Next()` is called. +func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) { + oidChan := make(chan git.ObjectID) + nextChan := make(chan interface{}) + return &chanObjectIterator{ + oidChan: oidChan, + nextChan: nextChan, + }, oidChan, nextChan +} + +func (ch *chanObjectIterator) Next() bool { + // Notify the caller that the next object was requested. + ch.nextChan <- struct{}{} + + var ok bool + ch.oid, ok = <-ch.oidChan + return ok +} + +func (ch *chanObjectIterator) ObjectID() git.ObjectID { + return ch.oid +} + +func (ch *chanObjectIterator) ObjectName() []byte { + return []byte("idontcare") +} |