Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/git/catfile/request_queue.go')
-rw-r--r--internal/git/catfile/request_queue.go107
1 files changed, 37 insertions, 70 deletions
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index a31a8b029..e6090990c 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"os"
- "sync"
"sync/atomic"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
@@ -33,8 +32,7 @@ type requestQueue struct {
// closed indicates whether the queue is closed for additional requests.
closed int32
- // isReadingObject indicates whether the queue is currently in `ReadObject()` to
- // read in a new object's header.
+ // isReadingObject indicates whether there is a read in progress.
isReadingObject int32
// isObjectQueue is set to `true` when this is a request queue which can be used for reading
@@ -44,10 +42,6 @@ type requestQueue struct {
stdout *bufio.Reader
stdin *bufio.Writer
- // currentObject is the currently read object.
- currentObject *Object
- currentObjectLock sync.Mutex
-
// trace is the current tracing span.
trace *trace
}
@@ -55,17 +49,6 @@ type requestQueue struct {
// isDirty returns true either if there are outstanding requests for objects or if the current
// object hasn't yet been fully consumed.
func (q *requestQueue) isDirty() bool {
- // We need to take this lock early. Like this we can ensure that `ReadObject()`
- // cannot have it at the same time while `isReadingObject == 0`.
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- // We must check for the current object first: we cannot queue another object due to the
- // object lock, but we may queue another request while checking for dirtiness.
- if q.currentObject != nil && q.currentObject.isDirty() {
- return true
- }
-
if atomic.LoadInt32(&q.isReadingObject) != 0 {
return true
}
@@ -82,14 +65,7 @@ func (q *requestQueue) isClosed() bool {
}
func (q *requestQueue) close() {
- if atomic.CompareAndSwapInt32(&q.closed, 0, 1) {
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- if q.currentObject != nil {
- q.currentObject.close()
- }
- }
+ atomic.StoreInt32(&q.closed, 1)
}
func (q *requestQueue) RequestRevision(revision git.Revision) error {
@@ -132,72 +108,63 @@ func (q *requestQueue) Flush() error {
return nil
}
+type readerFunc func([]byte) (int, error)
+
+func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) }
+
func (q *requestQueue) ReadObject() (*Object, error) {
if !q.isObjectQueue {
panic("object queue used to read object info")
}
// We need to ensure that only a single call to `ReadObject()` can happen at the
- // same point in time. While the easy solution would be to just lock
- // `currentObjectLock()`, that would mean we hold the lock while performing I/O
- // operations. That's a no-go though because the command may stall, and would
- // deadlock with `isDirty()` and `close()`.
+ // same point in time.
//
// Note that this must happen before we read the current object: otherwise, a
// concurrent caller might've already swapped it out under our feet.
if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) {
- return nil, fmt.Errorf("concurrent read of the object")
- }
- defer func() {
- atomic.StoreInt32(&q.isReadingObject, 0)
- }()
-
- // We know no other user can change `currentObject` right now anymore, but we still
- // need to take `currentObjectLock()` given that concurrent callers may try to access the
- // object.
- q.currentObjectLock.Lock()
- currentObject := q.currentObject
- q.currentObjectLock.Unlock()
-
- if currentObject != nil {
- // If the current object is still dirty, then we must not try to read a new object.
- if currentObject.isDirty() {
- return nil, fmt.Errorf("current object has not been fully read")
- }
-
- currentObject.close()
-
- q.currentObjectLock.Lock()
- q.currentObject = nil
- q.currentObjectLock.Unlock()
-
- // If we have already read an object before, then we must consume the trailing
- // newline after the object's data.
- if _, err := q.stdout.ReadByte(); err != nil {
- return nil, err
- }
+ return nil, fmt.Errorf("current object has not been fully read")
}
objectInfo, err := q.readInfo()
if err != nil {
+ atomic.StoreInt32(&q.isReadingObject, 0)
return nil, err
}
q.trace.recordRequest(objectInfo.Type)
- // Synchronize with readers of the object.
- q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
-
- q.currentObject = &Object{
- ObjectInfo: *objectInfo,
- dataReader: io.LimitedReader{
+ // objectReader first reads the object data from stdout. After that, it discards
+ // the trailing newline byte that separate the different objects. Finally, it
+ // undirties the reader so the next object can be read.
+ objectReader := io.MultiReader(
+ &io.LimitedReader{
R: q.stdout,
N: objectInfo.Size,
},
- bytesRemaining: objectInfo.Size,
- }
+ readerFunc(func([]byte) (int, error) {
+ if _, err := io.CopyN(io.Discard, q.stdout, 1); err != nil {
+ return 0, fmt.Errorf("discard newline: %q", err)
+ }
+
+ atomic.StoreInt32(&q.isReadingObject, 0)
- return q.currentObject, nil
+ return 0, io.EOF
+ }),
+ )
+
+ return &Object{
+ ObjectInfo: *objectInfo,
+ dataReader: readerFunc(func(buf []byte) (int, error) {
+ // The tests assert that no data can be read after the queue is closed.
+ // Some data could be actually read even after the queue closes due to the buffering.
+ // Check here if the queue is closed and refuse to read more if so.
+ if q.isClosed() {
+ return 0, os.ErrClosed
+ }
+
+ return objectReader.Read(buf)
+ }),
+ }, nil
}
func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {