diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-23 15:27:25 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-23 16:34:06 +0300 |
commit | 6eec25507e2eaa93775ab1bb63883dea55e42757 (patch) | |
tree | c69b27538882f1d8eeab65fce7168f81704cfe67 | |
parent | 94055b253d05bc04f533c977be892b0cd6f225ea (diff) |
gitpipe: Fix deadlock on context cancellation with unflushed requestspks-catfile-context-cancellation-unflushed-requests-deadlock
In commit 8773480fd (gitpipe: Propagate context cancellation in object
data pipeline, 2022-05-04), we have fixed an error that cancellation of
the context wasn't properly propagated to callers. While fixing this we
have introduced a new deadlock though: when the context is cancelled, we
may abort the pipeline early without flushing outstanding requests. This
means that the downstream reader which tries to read object data from
the git-cat-file(1) process is blocked indefinitely in case the process
is cached given that it wouldn't be killed by the context cancellation.
Fix this deadlock by flushing any outstanding requests when the context
is cancelled.
Changelog: fixed
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 9 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 48 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 9 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 47 | ||||
-rw-r--r-- | internal/git/gitpipe/testhelper_test.go | 40 |
5 files changed, 153 insertions, 0 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 9f821f5ae..7f36aeafd 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -82,6 +82,15 @@ func CatfileInfo( objectID: it.ObjectID(), objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return + } + + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()}) return } diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 6651dbac3..450fbe13e 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -6,11 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) const ( @@ -187,6 +189,52 @@ func TestCatfileInfo(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer objectInfoReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileInfo(ctx, objectInfoReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Reading the object should now fail because the context got cancelled, but it + // definitely shouldn't block like it did earlier. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) } func TestCatfileInfoAllObjects(t *testing.T) { diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index b00d4983e..78e59ead0 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -74,6 +74,15 @@ func CatfileObject( if isDone := sendRequest(catfileObjectRequest{ objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendRequest(catfileObjectRequest{err: err}) + return + } + + sendRequest(catfileObjectRequest{err: ctx.Err()}) return } diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 5e40cf444..17ccfe017 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) func TestCatfileObject(t *testing.T) { @@ -149,4 +150,50 @@ func TestCatfileObject(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer objectReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileObject(ctx, objectReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Reading the object should now fail because the context got cancelled, but it + // definitely shouldn't block like it did earlier. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) } diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go index 4d715bb7d..b82eb4d82 100644 --- a/internal/git/gitpipe/testhelper_test.go +++ b/internal/git/gitpipe/testhelper_test.go @@ -3,9 +3,49 @@ package gitpipe import ( "testing" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" ) func TestMain(m *testing.M) { testhelper.Run(m) } + +// chanObjectIterator is an object iterator that can be driven via a set of channels for +// deterministically exercising specific conditions in tests. +type chanObjectIterator struct { + ObjectIterator + + oid git.ObjectID + oidChan <-chan git.ObjectID + nextChan chan<- interface{} +} + +// newChanObjectIterator returns a new object iterator as well as two channels: one object ID +// channel that can be used to inject the next value returned by `Next()`. And then a second value +// that is written to when `Next()` is called. +func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) { + oidChan := make(chan git.ObjectID) + nextChan := make(chan interface{}) + return &chanObjectIterator{ + oidChan: oidChan, + nextChan: nextChan, + }, oidChan, nextChan +} + +func (ch *chanObjectIterator) Next() bool { + // Notify the caller that the next object was requested. + ch.nextChan <- struct{}{} + + var ok bool + ch.oid, ok = <-ch.oidChan + return ok +} + +func (ch *chanObjectIterator) ObjectID() git.ObjectID { + return ch.oid +} + +func (ch *chanObjectIterator) ObjectName() []byte { + return []byte("idontcare") +} |