Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-23 15:27:25 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-24 08:18:37 +0300
commitbee7ceb4ed70fc4b4001e2981725c95943d539d8 (patch)
treee70e3d0abb35ea58621bdd2413c1903802653872
parent94055b253d05bc04f533c977be892b0cd6f225ea (diff)
gitpipe: Fix deadlock on context cancellation with unflushed requests
In commit 8773480fd (gitpipe: Propagate context cancellation in object data pipeline, 2022-05-04), we have fixed an error that cancellation of the context wasn't properly propagated to callers. While fixing this we have introduced a new deadlock though: when the context is cancelled, we may abort the pipeline early without flushing outstanding requests. This means that the downstream reader which tries to read object data from the git-cat-file(1) process is blocked indefinitely in case the process is cached given that it wouldn't be killed by the context cancellation. Fix this deadlock by flushing any outstanding requests when the context is cancelled. Changelog: fixed
-rw-r--r--internal/git/gitpipe/catfile_info.go9
-rw-r--r--internal/git/gitpipe/catfile_info_test.go54
-rw-r--r--internal/git/gitpipe/catfile_object.go9
-rw-r--r--internal/git/gitpipe/catfile_object_test.go53
-rw-r--r--internal/git/gitpipe/testhelper_test.go40
5 files changed, 165 insertions, 0 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 9f821f5ae..7f36aeafd 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -82,6 +82,15 @@ func CatfileInfo(
objectID: it.ObjectID(),
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
+ }
+
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()})
return
}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 6651dbac3..0f1e33ff2 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -6,11 +6,13 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
const (
@@ -187,6 +189,58 @@ func TestCatfileInfo(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectInfoReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileInfo(ctx, objectInfoReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Verify whether we can receive any more objects via the iterator. This should
+ // fail because the context got cancelled, but in any case it shouldn't block. Note
+ // that we're forced to reach into the channel directly: `Next()` would return
+ // `false` immediately because the context is cancelled.
+ _, ok := <-it.(*catfileInfoIterator).ch
+ require.False(t, ok)
+
+ // Sanity-check whether the iterator is in the expected state.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
}
func TestCatfileInfoAllObjects(t *testing.T) {
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index b00d4983e..78e59ead0 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -74,6 +74,15 @@ func CatfileObject(
if isDone := sendRequest(catfileObjectRequest{
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendRequest(catfileObjectRequest{err: err})
+ return
+ }
+
+ sendRequest(catfileObjectRequest{err: ctx.Err()})
return
}
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 5e40cf444..99a0a9615 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
func TestCatfileObject(t *testing.T) {
@@ -149,4 +150,56 @@ func TestCatfileObject(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileObject(ctx, objectReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Verify whether we can receive any more objects via the iterator. This should
+ // fail because the context got cancelled, but in any case it shouldn't block. Note
+ // that we're forced to reach into the channel directly: `Next()` would return
+ // `false` immediately because the context is cancelled.
+ _, ok := <-it.(*catfileObjectIterator).ch
+ require.False(t, ok)
+
+ // Sanity-check whether the iterator is in the expected state.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
}
diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go
index 4d715bb7d..b82eb4d82 100644
--- a/internal/git/gitpipe/testhelper_test.go
+++ b/internal/git/gitpipe/testhelper_test.go
@@ -3,9 +3,49 @@ package gitpipe
import (
"testing"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
)
func TestMain(m *testing.M) {
testhelper.Run(m)
}
+
+// chanObjectIterator is an object iterator that can be driven via a set of channels for
+// deterministically exercising specific conditions in tests.
+type chanObjectIterator struct {
+ ObjectIterator
+
+ oid git.ObjectID
+ oidChan <-chan git.ObjectID
+ nextChan chan<- interface{}
+}
+
+// newChanObjectIterator returns a new object iterator as well as two channels: one object ID
+// channel that can be used to inject the next value returned by `Next()`. And then a second value
+// that is written to when `Next()` is called.
+func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) {
+ oidChan := make(chan git.ObjectID)
+ nextChan := make(chan interface{})
+ return &chanObjectIterator{
+ oidChan: oidChan,
+ nextChan: nextChan,
+ }, oidChan, nextChan
+}
+
+func (ch *chanObjectIterator) Next() bool {
+ // Notify the caller that the next object was requested.
+ ch.nextChan <- struct{}{}
+
+ var ok bool
+ ch.oid, ok = <-ch.oidChan
+ return ok
+}
+
+func (ch *chanObjectIterator) ObjectID() git.ObjectID {
+ return ch.oid
+}
+
+func (ch *chanObjectIterator) ObjectName() []byte {
+ return []byte("idontcare")
+}