Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-05-31 19:00:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-06-02 20:06:24 +0300
commit93e24a3ac7eefbfa5768d3a425ca0e891b5d66bf (patch)
tree22cee170f239e4bfb30aef1eeb6b3869448497e4
parente263ba26e72b18015fc09bd97b759b074e660abf (diff)
Simplify request queue dirtiness trackingsmh-streamline-locking
The dirtiness tracking in request queue is somewhat hard to understand as it bounces between multiple types. The Object type is tracking the number of bytes read. The queue retains a reference to the latest read object to check whether it has been fully read or not. This back and forth can be simplified by managing the dirtiness state completely in the queue. This commit simplifies the tracking by marking the queue dirty when an object is returned, and composing the readers in a manner that flicks the dirtiness bit off once the reading has been completed. This moves all of the dirtiness logic to the queue, making it easier to understand. The object is reading from the stdout of a cat-file process. Closing the cat-file process also closes the underlying reader the Object is using. There's no need to have a separate closing mechanism for the Object so the close and isClosed is removed as part of this change. They were only called from the queue and it is no longer tracking the currentObject so it can't close the object separately. The behavior to return os.ErrClosed on reading when the queue is closed is still retained as some tests rely on it. A later commit can adjust the tests to remove the assertions and the behavior. The tests incorrectly assert that nothing could be read after the process is closed. The io is buffered and it is possible that something can still be read from the buffer even after the underlying process is already closed.
-rw-r--r--internal/git/catfile/object_reader.go56
-rw-r--r--internal/git/catfile/object_reader_test.go1
-rw-r--r--internal/git/catfile/request_queue.go107
3 files changed, 41 insertions, 123 deletions
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 75a460bad..4810a8207 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
- "os"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
@@ -147,68 +146,21 @@ func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), er
// Object represents data returned by `git cat-file --batch`
type Object struct {
- // bytesRemaining tracks the number of bytes which are left to be read. While this duplicates the
- // information tracked in dataReader.N, this cannot be helped given that we need to make
- // access to this information atomic so there's no race between updating it and checking the
- // process for dirtiness. While we could use locking instead of atomics, we'd have to lock
- // during the whole read duration -- and thus it'd become impossible to check for dirtiness
- // at the same time.
- //
- // We list the atomic fields first to ensure they are 64-bit and 32-bit aligned:
- // https://pkg.go.dev/sync/atomic#pkg-note-BUG
- bytesRemaining int64
-
- // closed determines whether the object is closed for reading.
- closed int32
-
// ObjectInfo represents main information about object
ObjectInfo
// dataReader is reader which has all the object data.
- dataReader io.LimitedReader
-}
-
-// isDirty determines whether the object is still dirty, that is whether there are still unconsumed
-// bytes.
-func (o *Object) isDirty() bool {
- return atomic.LoadInt64(&o.bytesRemaining) != 0
-}
-
-func (o *Object) isClosed() bool {
- return atomic.LoadInt32(&o.closed) == 1
-}
-
-func (o *Object) close() {
- atomic.StoreInt32(&o.closed, 1)
+ dataReader io.Reader
}
func (o *Object) Read(p []byte) (int, error) {
- if o.isClosed() {
- return 0, os.ErrClosed
- }
-
- n, err := o.dataReader.Read(p)
- if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 {
- return n, fmt.Errorf("bytes remaining became negative while reading object")
- }
-
- return n, err
+ return o.dataReader.Read(p)
}
// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader
// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
// implemented by the respective reader or writer.
func (o *Object) WriteTo(w io.Writer) (int64, error) {
- if o.isClosed() {
- return 0, os.ErrClosed
- }
-
- // While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of
- // `ReadFrom()` in case the writer implements it.
- n, err := io.Copy(w, &o.dataReader)
- if atomic.AddInt64(&o.bytesRemaining, -n) < 0 {
- return n, fmt.Errorf("bytes remaining became negative while reading object")
- }
-
- return n, err
+ // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it.
+ return io.Copy(w, o.dataReader)
}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index fabe64178..2f88a1aed 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -411,7 +411,6 @@ func TestObjectReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.True(t, object.isClosed())
_, err = io.ReadAll(object)
require.Equal(t, os.ErrClosed, err)
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index a31a8b029..e6090990c 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"os"
- "sync"
"sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
@@ -33,8 +32,7 @@ type requestQueue struct {
// closed indicates whether the queue is closed for additional requests.
closed int32
- // isReadingObject indicates whether the queue is currently in `ReadObject()` to
- // read in a new object's header.
+ // isReadingObject indicates whether there is a read in progress.
isReadingObject int32
// isObjectQueue is set to `true` when this is a request queue which can be used for reading
@@ -44,10 +42,6 @@ type requestQueue struct {
stdout *bufio.Reader
stdin *bufio.Writer
- // currentObject is the currently read object.
- currentObject *Object
- currentObjectLock sync.Mutex
-
// trace is the current tracing span.
trace *trace
}
@@ -55,17 +49,6 @@ type requestQueue struct {
// isDirty returns true either if there are outstanding requests for objects or if the current
// object hasn't yet been fully consumed.
func (q *requestQueue) isDirty() bool {
- // We need to take this lock early. Like this we can ensure that `ReadObject()`
- // cannot have it at the same time while `isReadingObject == 0`.
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- // We must check for the current object first: we cannot queue another object due to the
- // object lock, but we may queue another request while checking for dirtiness.
- if q.currentObject != nil && q.currentObject.isDirty() {
- return true
- }
-
if atomic.LoadInt32(&q.isReadingObject) != 0 {
return true
}
@@ -82,14 +65,7 @@ func (q *requestQueue) isClosed() bool {
}
func (q *requestQueue) close() {
- if atomic.CompareAndSwapInt32(&q.closed, 0, 1) {
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- if q.currentObject != nil {
- q.currentObject.close()
- }
- }
+ atomic.StoreInt32(&q.closed, 1)
}
func (q *requestQueue) RequestRevision(revision git.Revision) error {
@@ -132,72 +108,63 @@ func (q *requestQueue) Flush() error {
return nil
}
+type readerFunc func([]byte) (int, error)
+
+func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) }
+
func (q *requestQueue) ReadObject() (*Object, error) {
if !q.isObjectQueue {
panic("object queue used to read object info")
}
// We need to ensure that only a single call to `ReadObject()` can happen at the
- // same point in time. While the easy solution would be to just lock
- // `currentObjectLock()`, that would mean we hold the lock while performing I/O
- // operations. That's a no-go though because the command may stall, and would
- // deadlock with `isDirty()` and `close()`.
+ // same point in time.
//
// Note that this must happen before we read the current object: otherwise, a
// concurrent caller might've already swapped it out under our feet.
if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) {
- return nil, fmt.Errorf("concurrent read of the object")
- }
- defer func() {
- atomic.StoreInt32(&q.isReadingObject, 0)
- }()
-
- // We know no other user can change `currentObject` right now anymore, but we still
- // need to take `currentObjectLock()` given that concurrent callers may try to access the
- // object.
- q.currentObjectLock.Lock()
- currentObject := q.currentObject
- q.currentObjectLock.Unlock()
-
- if currentObject != nil {
- // If the current object is still dirty, then we must not try to read a new object.
- if currentObject.isDirty() {
- return nil, fmt.Errorf("current object has not been fully read")
- }
-
- currentObject.close()
-
- q.currentObjectLock.Lock()
- q.currentObject = nil
- q.currentObjectLock.Unlock()
-
- // If we have already read an object before, then we must consume the trailing
- // newline after the object's data.
- if _, err := q.stdout.ReadByte(); err != nil {
- return nil, err
- }
+ return nil, fmt.Errorf("current object has not been fully read")
}
objectInfo, err := q.readInfo()
if err != nil {
+ atomic.StoreInt32(&q.isReadingObject, 0)
return nil, err
}
q.trace.recordRequest(objectInfo.Type)
- // Synchronize with readers of the object.
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- q.currentObject = &Object{
- ObjectInfo: *objectInfo,
- dataReader: io.LimitedReader{
+ // objectReader first reads the object data from stdout. After that, it discards
+ // the trailing newline byte that separate the different objects. Finally, it
+ // undirties the reader so the next object can be read.
+ objectReader := io.MultiReader(
+ &io.LimitedReader{
R: q.stdout,
N: objectInfo.Size,
},
- bytesRemaining: objectInfo.Size,
- }
+ readerFunc(func([]byte) (int, error) {
+ if _, err := io.CopyN(io.Discard, q.stdout, 1); err != nil {
+ return 0, fmt.Errorf("discard newline: %q", err)
+ }
+
+ atomic.StoreInt32(&q.isReadingObject, 0)
- return q.currentObject, nil
+ return 0, io.EOF
+ }),
+ )
+
+ return &Object{
+ ObjectInfo: *objectInfo,
+ dataReader: readerFunc(func(buf []byte) (int, error) {
+ // The tests assert that no data can be read after the queue is closed.
+ // Some data could be actually read even after the queue closes due to the buffering.
+ // Check here if the queue is closed and refuse to read more if so.
+ if q.isClosed() {
+ return 0, os.ErrClosed
+ }
+
+ return objectReader.Read(buf)
+ }),
+ }, nil
}
func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {