diff options
Diffstat (limited to 'internal/git/catfile/request_queue.go')
-rw-r--r-- | internal/git/catfile/request_queue.go | 107 |
1 files changed, 37 insertions, 70 deletions
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index a31a8b029..e6090990c 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "sync" "sync/atomic" "gitlab.com/gitlab-org/gitaly/v15/internal/git" @@ -33,8 +32,7 @@ type requestQueue struct { // closed indicates whether the queue is closed for additional requests. closed int32 - // isReadingObject indicates whether the queue is currently in `ReadObject()` to - // read in a new object's header. + // isReadingObject indicates whether there is a read in progress. isReadingObject int32 // isObjectQueue is set to `true` when this is a request queue which can be used for reading @@ -44,10 +42,6 @@ type requestQueue struct { stdout *bufio.Reader stdin *bufio.Writer - // currentObject is the currently read object. - currentObject *Object - currentObjectLock sync.Mutex - // trace is the current tracing span. trace *trace } @@ -55,17 +49,6 @@ type requestQueue struct { // isDirty returns true either if there are outstanding requests for objects or if the current // object hasn't yet been fully consumed. func (q *requestQueue) isDirty() bool { - // We need to take this lock early. Like this we can ensure that `ReadObject()` - // cannot have it at the same time while `isReadingObject == 0`. - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - // We must check for the current object first: we cannot queue another object due to the - // object lock, but we may queue another request while checking for dirtiness. - if q.currentObject != nil && q.currentObject.isDirty() { - return true - } - if atomic.LoadInt32(&q.isReadingObject) != 0 { return true } @@ -82,14 +65,7 @@ func (q *requestQueue) isClosed() bool { } func (q *requestQueue) close() { - if atomic.CompareAndSwapInt32(&q.closed, 0, 1) { - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - if q.currentObject != nil { - q.currentObject.close() - } - } + atomic.StoreInt32(&q.closed, 1) } func (q *requestQueue) RequestRevision(revision git.Revision) error { @@ -132,72 +108,63 @@ func (q *requestQueue) Flush() error { return nil } +type readerFunc func([]byte) (int, error) + +func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) } + func (q *requestQueue) ReadObject() (*Object, error) { if !q.isObjectQueue { panic("object queue used to read object info") } // We need to ensure that only a single call to `ReadObject()` can happen at the - // same point in time. While the easy solution would be to just lock - // `currentObjectLock()`, that would mean we hold the lock while performing I/O - // operations. That's a no-go though because the command may stall, and would - // deadlock with `isDirty()` and `close()`. + // same point in time. // // Note that this must happen before we read the current object: otherwise, a // concurrent caller might've already swapped it out under our feet. if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) { - return nil, fmt.Errorf("concurrent read of the object") - } - defer func() { - atomic.StoreInt32(&q.isReadingObject, 0) - }() - - // We know no other user can change `currentObject` right now anymore, but we still - // need to take `currentObjectLock()` given that concurrent callers may try to access the - // object. - q.currentObjectLock.Lock() - currentObject := q.currentObject - q.currentObjectLock.Unlock() - - if currentObject != nil { - // If the current object is still dirty, then we must not try to read a new object. - if currentObject.isDirty() { - return nil, fmt.Errorf("current object has not been fully read") - } - - currentObject.close() - - q.currentObjectLock.Lock() - q.currentObject = nil - q.currentObjectLock.Unlock() - - // If we have already read an object before, then we must consume the trailing - // newline after the object's data. - if _, err := q.stdout.ReadByte(); err != nil { - return nil, err - } + return nil, fmt.Errorf("current object has not been fully read") } objectInfo, err := q.readInfo() if err != nil { + atomic.StoreInt32(&q.isReadingObject, 0) return nil, err } q.trace.recordRequest(objectInfo.Type) - // Synchronize with readers of the object. - q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() - - q.currentObject = &Object{ - ObjectInfo: *objectInfo, - dataReader: io.LimitedReader{ + // objectReader first reads the object data from stdout. After that, it discards + // the trailing newline byte that separate the different objects. Finally, it + // undirties the reader so the next object can be read. + objectReader := io.MultiReader( + &io.LimitedReader{ R: q.stdout, N: objectInfo.Size, }, - bytesRemaining: objectInfo.Size, - } + readerFunc(func([]byte) (int, error) { + if _, err := io.CopyN(io.Discard, q.stdout, 1); err != nil { + return 0, fmt.Errorf("discard newline: %q", err) + } + + atomic.StoreInt32(&q.isReadingObject, 0) - return q.currentObject, nil + return 0, io.EOF + }), + ) + + return &Object{ + ObjectInfo: *objectInfo, + dataReader: readerFunc(func(buf []byte) (int, error) { + // The tests assert that no data can be read after the queue is closed. + // Some data could be actually read even after the queue closes due to the buffering. + // Check here if the queue is closed and refuse to read more if so. + if q.isClosed() { + return 0, os.ErrClosed + } + + return objectReader.Read(buf) + }), + }, nil } func (q *requestQueue) ReadInfo() (*ObjectInfo, error) { |