diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-23 17:04:46 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-05-24 08:30:45 +0300 |
commit | 3672650574465fbd42677b5f68d35f1beb555dc0 (patch) | |
tree | ba3256606b7c9b2bb3192d22d4d06dc5bed29821 | |
parent | bee7ceb4ed70fc4b4001e2981725c95943d539d8 (diff) |
gitpipe: Fix closing queue too early
We are using queues to batch writes to git-cat-file(1) in the gitpipe
package. Given that the queue can only be used by a single user at the
same time, this queue is getting locked when acquired. But by accident,
we're unlocking the queue immediately when we have constructed both the
info and object pipelines even though it is still in use. Luckily, this
programming error is mostly harmless: we finish the tracing span too
early and mark the queue as unused. But as long as no concurrent caller
tries to use the same queue this is not much of an issue.
Fix the bug by using a refcount so that we only close the queues when
they become unused.
Changelog: fixed
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 19 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info_test.go | 31 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 19 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object_test.go | 33 |
4 files changed, 94 insertions, 8 deletions
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 7f36aeafd..cefd81422 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" @@ -61,15 +62,20 @@ func CatfileInfo( opt(&cfg) } - queue, cleanup, err := objectInfoReader.InfoQueue(ctx) + queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx) if err != nil { return nil, err } - defer cleanup() + var queueRefcount int32 = 2 requestChan := make(chan catfileInfoRequest, 32) go func() { - defer close(requestChan) + defer func() { + close(requestChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() var i int64 for it.Next() { @@ -116,7 +122,12 @@ func CatfileInfo( resultChan := make(chan CatfileInfoResult) go func() { - defer close(resultChan) + defer func() { + close(resultChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() // It's fine to iterate over the request channel without paying attention to // context cancellation because the request channel itself would be closed if the diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 0f1e33ff2..93274718e 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -3,6 +3,7 @@ package gitpipe import ( "context" "errors" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -241,6 +242,36 @@ func TestCatfileInfo(t *testing.T) { require.False(t, it.Next()) require.Equal(t, context.Canceled, it.Err()) }) + + t.Run("spawning two pipes fails", func(t *testing.T) { + ctx := testhelper.Context(t) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectInfoReader, cancel, err := catfileCache.ObjectInfoReader(ctx, repo) + require.NoError(t, err) + defer cancel() + + input := []RevisionResult{ + {OID: lfsPointer1}, + } + + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + + // Reusing the queue is not allowed, so we should get an error here. + _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.Equal(t, fmt.Errorf("object info queue already in use"), err) + + // We now consume all the input of the iterator. + require.True(t, it.Next()) + require.False(t, it.Next()) + + // Which means that the queue should now be unused, so we can again use it. + _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + }) } func TestCatfileInfoAllObjects(t *testing.T) { diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 78e59ead0..5fe031579 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" @@ -38,15 +39,20 @@ func CatfileObject( objectReader catfile.ObjectReader, it ObjectIterator, ) (CatfileObjectIterator, error) { - queue, cleanup, err := objectReader.ObjectQueue(ctx) + queue, queueCleanup, err := objectReader.ObjectQueue(ctx) if err != nil { return nil, err } - defer cleanup() + var queueRefcount int32 = 2 requestChan := make(chan catfileObjectRequest, 32) go func() { - defer close(requestChan) + defer func() { + close(requestChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() sendRequest := func(request catfileObjectRequest) bool { // Please refer to `sendResult()` for why we treat the context specially. @@ -108,7 +114,12 @@ func CatfileObject( resultChan := make(chan CatfileObjectResult) go func() { - defer close(resultChan) + defer func() { + close(resultChan) + if atomic.AddInt32(&queueRefcount, -1) == 0 { + queueCleanup() + } + }() sendResult := func(result CatfileObjectResult) bool { // In case the context has been cancelled, we have a race between observing diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index 99a0a9615..5b6121eac 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -3,6 +3,7 @@ package gitpipe import ( "context" "errors" + "fmt" "io" "testing" @@ -202,4 +203,36 @@ func TestCatfileObject(t *testing.T) { require.False(t, it.Next()) require.Equal(t, context.Canceled, it.Err()) }) + + t.Run("spawning two pipes fails", func(t *testing.T) { + ctx := testhelper.Context(t) + + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + objectReader, cancel, err := catfileCache.ObjectReader(ctx, repo) + require.NoError(t, err) + defer cancel() + + input := []RevisionResult{ + {OID: lfsPointer1}, + } + + it, err := CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + + // Reusing the queue is not allowed, so we should get an error here. + _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.Equal(t, fmt.Errorf("object queue already in use"), err) + + // We now consume all the input of the iterator. + require.True(t, it.Next()) + _, err = io.Copy(io.Discard, it.Result()) + require.NoError(t, err) + require.False(t, it.Next()) + + // Which means that the queue should now be unused, so we can again use it. + _, err = CatfileObject(ctx, objectReader, NewRevisionIterator(ctx, input)) + require.NoError(t, err) + }) } |