diff options
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 9 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 48 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 9 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 47 | ||||
-rw-r--r-- | internal/git/gitpipe/testhelper_test.go | 40 |
5 files changed, 153 insertions, 0 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 9f821f5ae..7f36aeafd 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -82,6 +82,15 @@ func CatfileInfo( objectID: it.ObjectID(), objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) + return + } + + sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()}) return } diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 6651dbac3..450fbe13e 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -6,11 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) const ( @@ -187,6 +189,52 @@ func TestCatfileInfo(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer objectInfoReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileInfo(ctx, objectInfoReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Reading the object should now fail because the context got cancelled, but it + // definitely shouldn't block like it did earlier. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) } func TestCatfileInfoAllObjects(t *testing.T) { diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index b00d4983e..78e59ead0 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -74,6 +74,15 @@ func CatfileObject( if isDone := sendRequest(catfileObjectRequest{ objectName: it.ObjectName(), }); isDone { + // If the context got cancelled, then we need to flush out all + // outstanding requests so that the downstream consumer is + // unblocked. + if err := queue.Flush(); err != nil { + sendRequest(catfileObjectRequest{err: err}) + return + } + + sendRequest(catfileObjectRequest{err: ctx.Err()}) return } diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 5e40cf444..17ccfe017 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "google.golang.org/grpc/metadata" ) func TestCatfileObject(t *testing.T) { @@ -149,4 +150,50 @@ func TestCatfileObject(t *testing.T) { err: context.Canceled, }, it.Result()) }) + + t.Run("context cancellation with cached process", func(t *testing.T) { + ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs( + catfile.SessionIDField, "1", + )) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer objectReaderCancel() + + inputIter, inputCh, nextCh := newChanObjectIterator() + + it, err := CatfileObject(ctx, objectReader, inputIter) + require.NoError(t, err) + + // We request a single object from the catfile process. Because the request queue is + // not flushed after every object this means that the request is currently + // outstanding. + <-nextCh + inputCh <- git.ObjectID(lfsPointer1) + + // Wait for the pipeline to request the next object. + <-nextCh + + // We now cancel the context with the outstanding request. In the past, this used to + // block the downstream consumer of the object data. This is because of two reasons: + // + // - When the process is being cached then cancellation of the context doesn't cause + // the process to get killed. So consequentially, the process would sit around + // waiting for input. + // - We didn't flush the queue when the context was cancelled, so the buffered input + // never arrived at the process. + cancel() + + // Now we queue another request that should cause the pipeline to fail. + inputCh <- git.ObjectID(lfsPointer1) + + // Reading the object should now fail because the context got cancelled, but it + // definitely shouldn't block like it did earlier. + require.False(t, it.Next()) + require.Equal(t, context.Canceled, it.Err()) + }) } diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go index 4d715bb7d..b82eb4d82 100644 --- a/internal/git/gitpipe/testhelper_test.go +++ b/internal/git/gitpipe/testhelper_test.go @@ -3,9 +3,49 @@ package gitpipe import ( "testing" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" ) func TestMain(m *testing.M) { testhelper.Run(m) } + +// chanObjectIterator is an object iterator that can be driven via a set of channels for +// deterministically exercising specific conditions in tests. +type chanObjectIterator struct { + ObjectIterator + + oid git.ObjectID + oidChan <-chan git.ObjectID + nextChan chan<- interface{} +} + +// newChanObjectIterator returns a new object iterator as well as two channels: one object ID +// channel that can be used to inject the next value returned by `Next()`. And then a second value +// that is written to when `Next()` is called. +func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) { + oidChan := make(chan git.ObjectID) + nextChan := make(chan interface{}) + return &chanObjectIterator{ + oidChan: oidChan, + nextChan: nextChan, + }, oidChan, nextChan +} + +func (ch *chanObjectIterator) Next() bool { + // Notify the caller that the next object was requested. + ch.nextChan <- struct{}{} + + var ok bool + ch.oid, ok = <-ch.oidChan + return ok +} + +func (ch *chanObjectIterator) ObjectID() git.ObjectID { + return ch.oid +} + +func (ch *chanObjectIterator) ObjectName() []byte { + return []byte("idontcare") +} |