Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-05-24 16:52:50 +0300
committerJohn Cai <jcai@gitlab.com>2022-05-24 16:52:50 +0300
commit5aa9d4d29c49ebe427a4a895158e195725cda2da (patch)
tree26a482dabd1fc0a5d35d4cc89c975164f7e21b10
parentb2c8eaa672c9f2dc4b55477a3876f957e2c9a768 (diff)
parent3672650574465fbd42677b5f68d35f1beb555dc0 (diff)
Merge branch 'pks-catfile-context-cancellation-unflushed-requests-deadlock' into 'master'
gitpipe: Fix deadlock on context cancellation with unflushed requests Closes #4253 See merge request gitlab-org/gitaly!4581
-rw-r--r--internal/git/gitpipe/catfile_info.go28
-rw-r--r--internal/git/gitpipe/catfile_info_test.go85
-rw-r--r--internal/git/gitpipe/catfile_object.go28
-rw-r--r--internal/git/gitpipe/catfile_object_test.go86
-rw-r--r--internal/git/gitpipe/testhelper_test.go40
5 files changed, 259 insertions, 8 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 9f821f5ae..cefd81422 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
+ "sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
@@ -61,15 +62,20 @@ func CatfileInfo(
opt(&cfg)
}
- queue, cleanup, err := objectInfoReader.InfoQueue(ctx)
+ queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx)
if err != nil {
return nil, err
}
- defer cleanup()
+ var queueRefcount int32 = 2
requestChan := make(chan catfileInfoRequest, 32)
go func() {
- defer close(requestChan)
+ defer func() {
+ close(requestChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
var i int64
for it.Next() {
@@ -82,6 +88,15 @@ func CatfileInfo(
objectID: it.ObjectID(),
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
+ }
+
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()})
return
}
@@ -107,7 +122,12 @@ func CatfileInfo(
resultChan := make(chan CatfileInfoResult)
go func() {
- defer close(resultChan)
+ defer func() {
+ close(resultChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
// It's fine to iterate over the request channel without paying attention to
// context cancellation because the request channel itself would be closed if the
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 6651dbac3..93274718e 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -3,14 +3,17 @@ package gitpipe
import (
"context"
"errors"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
const (
@@ -187,6 +190,88 @@ func TestCatfileInfo(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectInfoReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileInfo(ctx, objectInfoReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Verify whether we can receive any more objects via the iterator. This should
+ // fail because the context got cancelled, but in any case it shouldn't block. Note
+ // that we're forced to reach into the channel directly: `Next()` would return
+ // `false` immediately because the context is cancelled.
+ _, ok := <-it.(*catfileInfoIterator).ch
+ require.False(t, ok)
+
+ // Sanity-check whether the iterator is in the expected state.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
+
+ t.Run("spawning two pipes fails", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer cancel()
+
+ input := []RevisionResult{
+ {OID: lfsPointer1},
+ }
+
+ it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+
+ // Reusing the queue is not allowed, so we should get an error here.
+ _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.Equal(t, fmt.Errorf("object info queue already in use"), err)
+
+ // We now consume all the input of the iterator.
+ require.True(t, it.Next())
+ require.False(t, it.Next())
+
+ // Which means that the queue should now be unused, so we can again use it.
+ _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+ })
}
func TestCatfileInfoAllObjects(t *testing.T) {
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index b00d4983e..5fe031579 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync"
+ "sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
@@ -38,15 +39,20 @@ func CatfileObject(
objectReader catfile.ObjectReader,
it ObjectIterator,
) (CatfileObjectIterator, error) {
- queue, cleanup, err := objectReader.ObjectQueue(ctx)
+ queue, queueCleanup, err := objectReader.ObjectQueue(ctx)
if err != nil {
return nil, err
}
- defer cleanup()
+ var queueRefcount int32 = 2
requestChan := make(chan catfileObjectRequest, 32)
go func() {
- defer close(requestChan)
+ defer func() {
+ close(requestChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
sendRequest := func(request catfileObjectRequest) bool {
// Please refer to `sendResult()` for why we treat the context specially.
@@ -74,6 +80,15 @@ func CatfileObject(
if isDone := sendRequest(catfileObjectRequest{
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendRequest(catfileObjectRequest{err: err})
+ return
+ }
+
+ sendRequest(catfileObjectRequest{err: ctx.Err()})
return
}
@@ -99,7 +114,12 @@ func CatfileObject(
resultChan := make(chan CatfileObjectResult)
go func() {
- defer close(resultChan)
+ defer func() {
+ close(resultChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
sendResult := func(result CatfileObjectResult) bool {
// In case the context has been cancelled, we have a race between observing
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 5e40cf444..5b6121eac 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -3,6 +3,7 @@ package gitpipe
import (
"context"
"errors"
+ "fmt"
"io"
"testing"
@@ -13,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
func TestCatfileObject(t *testing.T) {
@@ -149,4 +151,88 @@ func TestCatfileObject(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileObject(ctx, objectReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Verify whether we can receive any more objects via the iterator. This should
+ // fail because the context got cancelled, but in any case it shouldn't block. Note
+ // that we're forced to reach into the channel directly: `Next()` would return
+ // `false` immediately because the context is cancelled.
+ _, ok := <-it.(*catfileObjectIterator).ch
+ require.False(t, ok)
+
+ // Sanity-check whether the iterator is in the expected state.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
+
+ t.Run("spawning two pipes fails", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer cancel()
+
+ input := []RevisionResult{
+ {OID: lfsPointer1},
+ }
+
+ it, err := CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+
+ // Reusing the queue is not allowed, so we should get an error here.
+ _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.Equal(t, fmt.Errorf("object queue already in use"), err)
+
+ // We now consume all the input of the iterator.
+ require.True(t, it.Next())
+ _, err = io.Copy(io.Discard, it.Result())
+ require.NoError(t, err)
+ require.False(t, it.Next())
+
+ // Which means that the queue should now be unused, so we can again use it.
+ _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+ })
}
diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go
index 4d715bb7d..b82eb4d82 100644
--- a/internal/git/gitpipe/testhelper_test.go
+++ b/internal/git/gitpipe/testhelper_test.go
@@ -3,9 +3,49 @@ package gitpipe
import (
"testing"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
)
func TestMain(m *testing.M) {
testhelper.Run(m)
}
+
+// chanObjectIterator is an object iterator that can be driven via a set of channels for
+// deterministically exercising specific conditions in tests.
+type chanObjectIterator struct {
+ ObjectIterator
+
+ oid git.ObjectID
+ oidChan <-chan git.ObjectID
+ nextChan chan<- interface{}
+}
+
+// newChanObjectIterator returns a new object iterator as well as two channels: one object ID
+// channel that can be used to inject the next value returned by `Next()`. And then a second value
+// that is written to when `Next()` is called.
+func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) {
+ oidChan := make(chan git.ObjectID)
+ nextChan := make(chan interface{})
+ return &chanObjectIterator{
+ oidChan: oidChan,
+ nextChan: nextChan,
+ }, oidChan, nextChan
+}
+
+func (ch *chanObjectIterator) Next() bool {
+ // Notify the caller that the next object was requested.
+ ch.nextChan <- struct{}{}
+
+ var ok bool
+ ch.oid, ok = <-ch.oidChan
+ return ok
+}
+
+func (ch *chanObjectIterator) ObjectID() git.ObjectID {
+ return ch.oid
+}
+
+func (ch *chanObjectIterator) ObjectName() []byte {
+ return []byte("idontcare")
+}