diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-03 11:26:44 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-03 11:26:44 +0300 |
commit | a31bd1be25d0ff03efaa7f756321ea9440122b24 (patch) | |
tree | 8ba14330e7152f5fe68c9c1650aad2a54126c5f7 | |
parent | 7c2fcde23bd4a962409897adbbb71da11c6db99a (diff) | |
parent | 49ccf030bbfc9b9d023e4edfbe2c10e7df2d2671 (diff) |
Merge branch 'pks-catfile-request-queue-isdirty-deadlock' into 'master'
catfile: Fix deadlock between reading a new object and accessing it
See merge request gitlab-org/gitaly!4590
-rw-r--r-- | internal/git/catfile/object_reader.go | 56 | ||||
-rw-r--r-- | internal/git/catfile/object_reader_test.go | 1 | ||||
-rw-r--r-- | internal/git/catfile/request_queue.go | 100 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers_test.go | 6 |
4 files changed, 67 insertions, 96 deletions
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 75a460bad..4810a8207 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "os" "sync/atomic" "github.com/prometheus/client_golang/prometheus" @@ -147,68 +146,21 @@ func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), er // Object represents data returned by `git cat-file --batch` type Object struct { - // bytesRemaining tracks the number of bytes which are left to be read. While this duplicates the - // information tracked in dataReader.N, this cannot be helped given that we need to make - // access to this information atomic so there's no race between updating it and checking the - // process for dirtiness. While we could use locking instead of atomics, we'd have to lock - // during the whole read duration -- and thus it'd become impossible to check for dirtiness - // at the same time. - // - // We list the atomic fields first to ensure they are 64-bit and 32-bit aligned: - // https://pkg.go.dev/sync/atomic#pkg-note-BUG - bytesRemaining int64 - - // closed determines whether the object is closed for reading. - closed int32 - // ObjectInfo represents main information about object ObjectInfo // dataReader is reader which has all the object data. - dataReader io.LimitedReader -} - -// isDirty determines whether the object is still dirty, that is whether there are still unconsumed -// bytes. -func (o *Object) isDirty() bool { - return atomic.LoadInt64(&o.bytesRemaining) != 0 -} - -func (o *Object) isClosed() bool { - return atomic.LoadInt32(&o.closed) == 1 -} - -func (o *Object) close() { - atomic.StoreInt32(&o.closed, 1) + dataReader io.Reader } func (o *Object) Read(p []byte) (int, error) { - if o.isClosed() { - return 0, os.ErrClosed - } - - n, err := o.dataReader.Read(p) - if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 { - return n, fmt.Errorf("bytes remaining became negative while reading object") - } - - return n, err + return o.dataReader.Read(p) } // WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader // via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are // implemented by the respective reader or writer. func (o *Object) WriteTo(w io.Writer) (int64, error) { - if o.isClosed() { - return 0, os.ErrClosed - } - - // While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of - // `ReadFrom()` in case the writer implements it. - n, err := io.Copy(w, &o.dataReader) - if atomic.AddInt64(&o.bytesRemaining, -n) < 0 { - return n, fmt.Errorf("bytes remaining became negative while reading object") - } - - return n, err + // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it. + return io.Copy(w, o.dataReader) } diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index fabe64178..2f88a1aed 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -411,7 +411,6 @@ func TestObjectReader_queue(t *testing.T) { require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - require.True(t, object.isClosed()) _, err = io.ReadAll(object) require.Equal(t, os.ErrClosed, err) diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index 62c6c868a..5b1558efa 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "sync" "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" @@ -33,6 +32,9 @@ type requestQueue struct { // closed indicates whether the queue is closed for additional requests. closed int32 + // isReadingObject indicates whether there is a read in progress. + isReadingObject int32 + // isObjectQueue is set to `true` when this is a request queue which can be used for reading // objects. If set to `false`, then this can only be used to read object info. isObjectQueue bool @@ -40,10 +42,6 @@ type requestQueue struct { stdout *bufio.Reader stdin *bufio.Writer - // currentObject is the currently read object. - currentObject *Object - currentObjectLock sync.Mutex - // trace is the current tracing span. trace *trace } @@ -51,13 +49,8 @@ type requestQueue struct { // isDirty returns true either if there are outstanding requests for objects or if the current // object hasn't yet been fully consumed. func (q *requestQueue) isDirty() bool { - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - // We must check for the current object first: we cannot queue another object due to the - // object lock, but we may queue another request while checking for dirtiness. - if q.currentObject != nil { - return q.currentObject.isDirty() + if atomic.LoadInt32(&q.isReadingObject) != 0 { + return true } if atomic.LoadInt64(&q.outstandingRequests) != 0 { @@ -72,14 +65,7 @@ func (q *requestQueue) isClosed() bool { } func (q *requestQueue) close() { - if atomic.CompareAndSwapInt32(&q.closed, 0, 1) { - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - if q.currentObject != nil { - q.currentObject.close() - } - } + atomic.StoreInt32(&q.closed, 1) } func (q *requestQueue) RequestRevision(revision git.Revision) error { @@ -122,46 +108,74 @@ func (q *requestQueue) Flush() error { return nil } +type readerFunc func([]byte) (int, error) + +func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) } + func (q *requestQueue) ReadObject() (*Object, error) { if !q.isObjectQueue { panic("object queue used to read object info") } - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - if q.currentObject != nil { - // If the current object is still dirty, then we must not try to read a new object. - if q.currentObject.isDirty() { - return nil, fmt.Errorf("current object has not been fully read") - } - - q.currentObject.close() - q.currentObject = nil - - // If we have already read an object before, then we must consume the trailing - // newline after the object's data. - if _, err := q.stdout.ReadByte(); err != nil { - return nil, err - } + // We need to ensure that only a single call to `ReadObject()` can happen at the + // same point in time. + // + // Note that this must happen before we read the current object: otherwise, a + // concurrent caller might've already swapped it out under our feet. + if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) { + return nil, fmt.Errorf("current object has not been fully read") } objectInfo, err := q.readInfo() if err != nil { + // In the general case we cannot know why reading the object's info has failed. And + // given that git-cat-file(1) is stateful, we cannot say whether it's safe to + // continue reading from it now or whether we need to keep the queue dirty instead. + // So we keep `isReadingObject == 0` in the general case so that it continues to + // stay dirty. + // + // One known exception is when we've got a NotFoundError: this is a graceful failure + // and we can continue reading from the process. + if IsNotFound(err) { + atomic.StoreInt32(&q.isReadingObject, 0) + } + return nil, err } q.trace.recordRequest(objectInfo.Type) - q.currentObject = &Object{ - ObjectInfo: *objectInfo, - dataReader: io.LimitedReader{ + // objectReader first reads the object data from stdout. After that, it discards + // the trailing newline byte that separate the different objects. Finally, it + // undirties the reader so the next object can be read. + objectReader := io.MultiReader( + &io.LimitedReader{ R: q.stdout, N: objectInfo.Size, }, - bytesRemaining: objectInfo.Size, - } + readerFunc(func([]byte) (int, error) { + if _, err := io.CopyN(io.Discard, q.stdout, 1); err != nil { + return 0, fmt.Errorf("discard newline: %q", err) + } + + atomic.StoreInt32(&q.isReadingObject, 0) - return q.currentObject, nil + return 0, io.EOF + }), + ) + + return &Object{ + ObjectInfo: *objectInfo, + dataReader: readerFunc(func(buf []byte) (int, error) { + // The tests assert that no data can be read after the queue is closed. + // Some data could be actually read even after the queue closes due to the buffering. + // Check here if the queue is closed and refuse to read more if so. + if q.isClosed() { + return 0, os.ErrClosed + } + + return objectReader.Read(buf) + }), + }, nil } func (q *requestQueue) ReadInfo() (*ObjectInfo, error) { diff --git a/internal/gitaly/service/blob/lfs_pointers_test.go b/internal/gitaly/service/blob/lfs_pointers_test.go index 9e8558209..8bb76da32 100644 --- a/internal/gitaly/service/blob/lfs_pointers_test.go +++ b/internal/gitaly/service/blob/lfs_pointers_test.go @@ -11,12 +11,14 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -66,6 +68,10 @@ func TestListLFSPointers(t *testing.T) { ctx := testhelper.Context(t) _, repo, _, client := setup(ctx, t) + ctx = testhelper.MergeOutgoingMetadata(ctx, + metadata.Pairs(catfile.SessionIDField, "1"), + ) + for _, tc := range []struct { desc string revs []string |