Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/git/gitpipe/catfile_info.go9
-rw-r--r--internal/git/gitpipe/catfile_info_test.go48
-rw-r--r--internal/git/gitpipe/catfile_object.go9
-rw-r--r--internal/git/gitpipe/catfile_object_test.go47
-rw-r--r--internal/git/gitpipe/testhelper_test.go40
5 files changed, 153 insertions, 0 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 9f821f5ae..7f36aeafd 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -82,6 +82,15 @@ func CatfileInfo(
objectID: it.ObjectID(),
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
+ return
+ }
+
+ sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: ctx.Err()})
return
}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 6651dbac3..450fbe13e 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -6,11 +6,13 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
const (
@@ -187,6 +189,52 @@ func TestCatfileInfo(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, objectInfoReaderCancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectInfoReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileInfo(ctx, objectInfoReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Reading the object should now fail because the context got cancelled, but it
+ // definitely shouldn't block like it did earlier.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
}
func TestCatfileInfoAllObjects(t *testing.T) {
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index b00d4983e..78e59ead0 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -74,6 +74,15 @@ func CatfileObject(
if isDone := sendRequest(catfileObjectRequest{
objectName: it.ObjectName(),
}); isDone {
+ // If the context got cancelled, then we need to flush out all
+ // outstanding requests so that the downstream consumer is
+ // unblocked.
+ if err := queue.Flush(); err != nil {
+ sendRequest(catfileObjectRequest{err: err})
+ return
+ }
+
+ sendRequest(catfileObjectRequest{err: ctx.Err()})
return
}
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 5e40cf444..17ccfe017 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "google.golang.org/grpc/metadata"
)
func TestCatfileObject(t *testing.T) {
@@ -149,4 +150,50 @@ func TestCatfileObject(t *testing.T) {
err: context.Canceled,
}, it.Result())
})
+
+ t.Run("context cancellation with cached process", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(
+ catfile.SessionIDField, "1",
+ ))
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, objectReaderCancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer objectReaderCancel()
+
+ inputIter, inputCh, nextCh := newChanObjectIterator()
+
+ it, err := CatfileObject(ctx, objectReader, inputIter)
+ require.NoError(t, err)
+
+ // We request a single object from the catfile process. Because the request queue is
+ // not flushed after every object this means that the request is currently
+ // outstanding.
+ <-nextCh
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Wait for the pipeline to request the next object.
+ <-nextCh
+
+ // We now cancel the context with the outstanding request. In the past, this used to
+ // block the downstream consumer of the object data. This is because of two reasons:
+ //
+ // - When the process is being cached then cancellation of the context doesn't cause
+ // the process to get killed. So consequentially, the process would sit around
+ // waiting for input.
+ // - We didn't flush the queue when the context was cancelled, so the buffered input
+ // never arrived at the process.
+ cancel()
+
+ // Now we queue another request that should cause the pipeline to fail.
+ inputCh <- git.ObjectID(lfsPointer1)
+
+ // Reading the object should now fail because the context got cancelled, but it
+ // definitely shouldn't block like it did earlier.
+ require.False(t, it.Next())
+ require.Equal(t, context.Canceled, it.Err())
+ })
}
diff --git a/internal/git/gitpipe/testhelper_test.go b/internal/git/gitpipe/testhelper_test.go
index 4d715bb7d..b82eb4d82 100644
--- a/internal/git/gitpipe/testhelper_test.go
+++ b/internal/git/gitpipe/testhelper_test.go
@@ -3,9 +3,49 @@ package gitpipe
import (
"testing"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
)
func TestMain(m *testing.M) {
testhelper.Run(m)
}
+
+// chanObjectIterator is an object iterator that can be driven via a set of channels for
+// deterministically exercising specific conditions in tests.
+type chanObjectIterator struct {
+ ObjectIterator
+
+ oid git.ObjectID
+ oidChan <-chan git.ObjectID
+ nextChan chan<- interface{}
+}
+
+// newChanObjectIterator returns a new object iterator as well as two channels: one object ID
+// channel that can be used to inject the next value returned by `Next()`. And then a second value
+// that is written to when `Next()` is called.
+func newChanObjectIterator() (ObjectIterator, chan<- git.ObjectID, <-chan interface{}) {
+ oidChan := make(chan git.ObjectID)
+ nextChan := make(chan interface{})
+ return &chanObjectIterator{
+ oidChan: oidChan,
+ nextChan: nextChan,
+ }, oidChan, nextChan
+}
+
+func (ch *chanObjectIterator) Next() bool {
+ // Notify the caller that the next object was requested.
+ ch.nextChan <- struct{}{}
+
+ var ok bool
+ ch.oid, ok = <-ch.oidChan
+ return ok
+}
+
+func (ch *chanObjectIterator) ObjectID() git.ObjectID {
+ return ch.oid
+}
+
+func (ch *chanObjectIterator) ObjectName() []byte {
+ return []byte("idontcare")
+}