Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-06-03 11:26:44 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-06-03 11:26:44 +0300
commita31bd1be25d0ff03efaa7f756321ea9440122b24 (patch)
tree8ba14330e7152f5fe68c9c1650aad2a54126c5f7
parent7c2fcde23bd4a962409897adbbb71da11c6db99a (diff)
parent49ccf030bbfc9b9d023e4edfbe2c10e7df2d2671 (diff)
Merge branch 'pks-catfile-request-queue-isdirty-deadlock' into 'master'
catfile: Fix deadlock between reading a new object and accessing it See merge request gitlab-org/gitaly!4590
-rw-r--r--internal/git/catfile/object_reader.go56
-rw-r--r--internal/git/catfile/object_reader_test.go1
-rw-r--r--internal/git/catfile/request_queue.go100
-rw-r--r--internal/gitaly/service/blob/lfs_pointers_test.go6
4 files changed, 67 insertions, 96 deletions
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 75a460bad..4810a8207 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
- "os"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
@@ -147,68 +146,21 @@ func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), er
// Object represents data returned by `git cat-file --batch`
type Object struct {
- // bytesRemaining tracks the number of bytes which are left to be read. While this duplicates the
- // information tracked in dataReader.N, this cannot be helped given that we need to make
- // access to this information atomic so there's no race between updating it and checking the
- // process for dirtiness. While we could use locking instead of atomics, we'd have to lock
- // during the whole read duration -- and thus it'd become impossible to check for dirtiness
- // at the same time.
- //
- // We list the atomic fields first to ensure they are 64-bit and 32-bit aligned:
- // https://pkg.go.dev/sync/atomic#pkg-note-BUG
- bytesRemaining int64
-
- // closed determines whether the object is closed for reading.
- closed int32
-
// ObjectInfo represents main information about object
ObjectInfo
// dataReader is reader which has all the object data.
- dataReader io.LimitedReader
-}
-
-// isDirty determines whether the object is still dirty, that is whether there are still unconsumed
-// bytes.
-func (o *Object) isDirty() bool {
- return atomic.LoadInt64(&o.bytesRemaining) != 0
-}
-
-func (o *Object) isClosed() bool {
- return atomic.LoadInt32(&o.closed) == 1
-}
-
-func (o *Object) close() {
- atomic.StoreInt32(&o.closed, 1)
+ dataReader io.Reader
}
func (o *Object) Read(p []byte) (int, error) {
- if o.isClosed() {
- return 0, os.ErrClosed
- }
-
- n, err := o.dataReader.Read(p)
- if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 {
- return n, fmt.Errorf("bytes remaining became negative while reading object")
- }
-
- return n, err
+ return o.dataReader.Read(p)
}
// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader
// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
// implemented by the respective reader or writer.
func (o *Object) WriteTo(w io.Writer) (int64, error) {
- if o.isClosed() {
- return 0, os.ErrClosed
- }
-
- // While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of
- // `ReadFrom()` in case the writer implements it.
- n, err := io.Copy(w, &o.dataReader)
- if atomic.AddInt64(&o.bytesRemaining, -n) < 0 {
- return n, fmt.Errorf("bytes remaining became negative while reading object")
- }
-
- return n, err
+ // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it.
+ return io.Copy(w, o.dataReader)
}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index fabe64178..2f88a1aed 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -411,7 +411,6 @@ func TestObjectReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.True(t, object.isClosed())
_, err = io.ReadAll(object)
require.Equal(t, os.ErrClosed, err)
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index 62c6c868a..5b1558efa 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"os"
- "sync"
"sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
@@ -33,6 +32,9 @@ type requestQueue struct {
// closed indicates whether the queue is closed for additional requests.
closed int32
+ // isReadingObject indicates whether there is a read in progress.
+ isReadingObject int32
+
// isObjectQueue is set to `true` when this is a request queue which can be used for reading
// objects. If set to `false`, then this can only be used to read object info.
isObjectQueue bool
@@ -40,10 +42,6 @@ type requestQueue struct {
stdout *bufio.Reader
stdin *bufio.Writer
- // currentObject is the currently read object.
- currentObject *Object
- currentObjectLock sync.Mutex
-
// trace is the current tracing span.
trace *trace
}
@@ -51,13 +49,8 @@ type requestQueue struct {
// isDirty returns true either if there are outstanding requests for objects or if the current
// object hasn't yet been fully consumed.
func (q *requestQueue) isDirty() bool {
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- // We must check for the current object first: we cannot queue another object due to the
- // object lock, but we may queue another request while checking for dirtiness.
- if q.currentObject != nil {
- return q.currentObject.isDirty()
+ if atomic.LoadInt32(&q.isReadingObject) != 0 {
+ return true
}
if atomic.LoadInt64(&q.outstandingRequests) != 0 {
@@ -72,14 +65,7 @@ func (q *requestQueue) isClosed() bool {
}
func (q *requestQueue) close() {
- if atomic.CompareAndSwapInt32(&q.closed, 0, 1) {
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- if q.currentObject != nil {
- q.currentObject.close()
- }
- }
+ atomic.StoreInt32(&q.closed, 1)
}
func (q *requestQueue) RequestRevision(revision git.Revision) error {
@@ -122,46 +108,74 @@ func (q *requestQueue) Flush() error {
return nil
}
+type readerFunc func([]byte) (int, error)
+
+func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) }
+
func (q *requestQueue) ReadObject() (*Object, error) {
if !q.isObjectQueue {
panic("object queue used to read object info")
}
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- if q.currentObject != nil {
- // If the current object is still dirty, then we must not try to read a new object.
- if q.currentObject.isDirty() {
- return nil, fmt.Errorf("current object has not been fully read")
- }
-
- q.currentObject.close()
- q.currentObject = nil
-
- // If we have already read an object before, then we must consume the trailing
- // newline after the object's data.
- if _, err := q.stdout.ReadByte(); err != nil {
- return nil, err
- }
+ // We need to ensure that only a single call to `ReadObject()` can happen at the
+ // same point in time.
+ //
+ // Note that this must happen before we read the current object: otherwise, a
+ // concurrent caller might've already swapped it out under our feet.
+ if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) {
+ return nil, fmt.Errorf("current object has not been fully read")
}
objectInfo, err := q.readInfo()
if err != nil {
+ // In the general case we cannot know why reading the object's info has failed. And
+ // given that git-cat-file(1) is stateful, we cannot say whether it's safe to
+ // continue reading from it now or whether we need to keep the queue dirty instead.
+ // So we keep `isReadingObject == 0` in the general case so that it continues to
+ // stay dirty.
+ //
+ // One known exception is when we've got a NotFoundError: this is a graceful failure
+ // and we can continue reading from the process.
+ if IsNotFound(err) {
+ atomic.StoreInt32(&q.isReadingObject, 0)
+ }
+
return nil, err
}
q.trace.recordRequest(objectInfo.Type)
- q.currentObject = &Object{
- ObjectInfo: *objectInfo,
- dataReader: io.LimitedReader{
+ // objectReader first reads the object data from stdout. After that, it discards
+ // the trailing newline byte that separate the different objects. Finally, it
+ // undirties the reader so the next object can be read.
+ objectReader := io.MultiReader(
+ &io.LimitedReader{
R: q.stdout,
N: objectInfo.Size,
},
- bytesRemaining: objectInfo.Size,
- }
+ readerFunc(func([]byte) (int, error) {
+ if _, err := io.CopyN(io.Discard, q.stdout, 1); err != nil {
+ return 0, fmt.Errorf("discard newline: %q", err)
+ }
+
+ atomic.StoreInt32(&q.isReadingObject, 0)
- return q.currentObject, nil
+ return 0, io.EOF
+ }),
+ )
+
+ return &Object{
+ ObjectInfo: *objectInfo,
+ dataReader: readerFunc(func(buf []byte) (int, error) {
+ // The tests assert that no data can be read after the queue is closed.
+ // Some data could be actually read even after the queue closes due to the buffering.
+ // Check here if the queue is closed and refuse to read more if so.
+ if q.isClosed() {
+ return 0, os.ErrClosed
+ }
+
+ return objectReader.Read(buf)
+ }),
+ }, nil
}
func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {
diff --git a/internal/gitaly/service/blob/lfs_pointers_test.go b/internal/gitaly/service/blob/lfs_pointers_test.go
index 9e8558209..8bb76da32 100644
--- a/internal/gitaly/service/blob/lfs_pointers_test.go
+++ b/internal/gitaly/service/blob/lfs_pointers_test.go
@@ -11,12 +11,14 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -66,6 +68,10 @@ func TestListLFSPointers(t *testing.T) {
ctx := testhelper.Context(t)
_, repo, _, client := setup(ctx, t)
+ ctx = testhelper.MergeOutgoingMetadata(ctx,
+ metadata.Pairs(catfile.SessionIDField, "1"),
+ )
+
for _, tc := range []struct {
desc string
revs []string