Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-23 17:04:46 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-05-24 08:30:45 +0300
commit3672650574465fbd42677b5f68d35f1beb555dc0 (patch)
treeba3256606b7c9b2bb3192d22d4d06dc5bed29821
parentbee7ceb4ed70fc4b4001e2981725c95943d539d8 (diff)
gitpipe: Fix closing queue too early
We are using queues to batch writes to git-cat-file(1) in the gitpipe package. Given that the queue can only be used by a single user at the same time, this queue is getting locked when acquired. But by accident, we're unlocking the queue immediately when we have constructed both the info and object pipelines even though it is still in use. Luckily, this programming error is mostly harmless: we finish the tracing span too early and mark the queue as unused. But as long as no concurrent caller tries to use the same queue this is not much of an issue. Fix the bug by using a refcount so that we only close the queues when they become unused. Changelog: fixed
-rw-r--r--internal/git/gitpipe/catfile_info.go19
-rw-r--r--internal/git/gitpipe/catfile_info_test.go31
-rw-r--r--internal/git/gitpipe/catfile_object.go19
-rw-r--r--internal/git/gitpipe/catfile_object_test.go33
4 files changed, 94 insertions, 8 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 7f36aeafd..cefd81422 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
+ "sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
@@ -61,15 +62,20 @@ func CatfileInfo(
opt(&cfg)
}
- queue, cleanup, err := objectInfoReader.InfoQueue(ctx)
+ queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx)
if err != nil {
return nil, err
}
- defer cleanup()
+ var queueRefcount int32 = 2
requestChan := make(chan catfileInfoRequest, 32)
go func() {
- defer close(requestChan)
+ defer func() {
+ close(requestChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
var i int64
for it.Next() {
@@ -116,7 +122,12 @@ func CatfileInfo(
resultChan := make(chan CatfileInfoResult)
go func() {
- defer close(resultChan)
+ defer func() {
+ close(resultChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
// It's fine to iterate over the request channel without paying attention to
// context cancellation because the request channel itself would be closed if the
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 0f1e33ff2..93274718e 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -3,6 +3,7 @@ package gitpipe
import (
"context"
"errors"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
@@ -241,6 +242,36 @@ func TestCatfileInfo(t *testing.T) {
require.False(t, it.Next())
require.Equal(t, context.Canceled, it.Err())
})
+
+ t.Run("spawning two pipes fails", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo)
+ require.NoError(t, err)
+ defer cancel()
+
+ input := []RevisionResult{
+ {OID: lfsPointer1},
+ }
+
+ it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+
+ // Reusing the queue is not allowed, so we should get an error here.
+ _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.Equal(t, fmt.Errorf("object info queue already in use"), err)
+
+ // We now consume all the input of the iterator.
+ require.True(t, it.Next())
+ require.False(t, it.Next())
+
+ // Which means that the queue should now be unused, so we can again use it.
+ _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+ })
}
func TestCatfileInfoAllObjects(t *testing.T) {
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 78e59ead0..5fe031579 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync"
+ "sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
@@ -38,15 +39,20 @@ func CatfileObject(
objectReader catfile.ObjectReader,
it ObjectIterator,
) (CatfileObjectIterator, error) {
- queue, cleanup, err := objectReader.ObjectQueue(ctx)
+ queue, queueCleanup, err := objectReader.ObjectQueue(ctx)
if err != nil {
return nil, err
}
- defer cleanup()
+ var queueRefcount int32 = 2
requestChan := make(chan catfileObjectRequest, 32)
go func() {
- defer close(requestChan)
+ defer func() {
+ close(requestChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
sendRequest := func(request catfileObjectRequest) bool {
// Please refer to `sendResult()` for why we treat the context specially.
@@ -108,7 +114,12 @@ func CatfileObject(
resultChan := make(chan CatfileObjectResult)
go func() {
- defer close(resultChan)
+ defer func() {
+ close(resultChan)
+ if atomic.AddInt32(&queueRefcount, -1) == 0 {
+ queueCleanup()
+ }
+ }()
sendResult := func(result CatfileObjectResult) bool {
// In case the context has been cancelled, we have a race between observing
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index 99a0a9615..5b6121eac 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -3,6 +3,7 @@ package gitpipe
import (
"context"
"errors"
+ "fmt"
"io"
"testing"
@@ -202,4 +203,36 @@ func TestCatfileObject(t *testing.T) {
require.False(t, it.Next())
require.Equal(t, context.Canceled, it.Err())
})
+
+ t.Run("spawning two pipes fails", func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo)
+ require.NoError(t, err)
+ defer cancel()
+
+ input := []RevisionResult{
+ {OID: lfsPointer1},
+ }
+
+ it, err := CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+
+ // Reusing the queue is not allowed, so we should get an error here.
+ _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.Equal(t, fmt.Errorf("object queue already in use"), err)
+
+ // We now consume all the input of the iterator.
+ require.True(t, it.Next())
+ _, err = io.Copy(io.Discard, it.Result())
+ require.NoError(t, err)
+ require.False(t, it.Next())
+
+ // Which means that the queue should now be unused, so we can again use it.
+ _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input))
+ require.NoError(t, err)
+ })
}