diff options
-rw-r--r-- | internal/git/catfile/request_queue.go | 44 |
1 files changed, 40 insertions, 4 deletions
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index d41ed38dc..a31a8b029 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -33,6 +33,10 @@ type requestQueue struct { // closed indicates whether the queue is closed for additional requests. closed int32 + // isReadingObject indicates whether the queue is currently in `ReadObject()` to + // read in a new object's header. + isReadingObject int32 + // isObjectQueue is set to `true` when this is a request queue which can be used for reading // objects. If set to `false`, then this can only be used to read object info. isObjectQueue bool @@ -51,6 +55,8 @@ type requestQueue struct { // isDirty returns true either if there are outstanding requests for objects or if the current // object hasn't yet been fully consumed. func (q *requestQueue) isDirty() bool { + // We need to take this lock early. Like this we can ensure that `ReadObject()` + // cannot have it at the same time while `isReadingObject == 0`. q.currentObjectLock.Lock() defer q.currentObjectLock.Unlock() @@ -60,6 +66,10 @@ func (q *requestQueue) isDirty() bool { return true } + if atomic.LoadInt32(&q.isReadingObject) != 0 { + return true + } + if atomic.LoadInt64(&q.outstandingRequests) != 0 { return true } @@ -127,17 +137,39 @@ func (q *requestQueue) ReadObject() (*Object, error) { panic("object queue used to read object info") } + // We need to ensure that only a single call to `ReadObject()` can happen at the + // same point in time. While the easy solution would be to just lock + // `currentObjectLock()`, that would mean we hold the lock while performing I/O + // operations. That's a no-go though because the command may stall, and would + // deadlock with `isDirty()` and `close()`. + // + // Note that this must happen before we read the current object: otherwise, a + // concurrent caller might've already swapped it out under our feet. + if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) { + return nil, fmt.Errorf("concurrent read of the object") + } + defer func() { + atomic.StoreInt32(&q.isReadingObject, 0) + }() + + // We know no other user can change `currentObject` right now anymore, but we still + // need to take `currentObjectLock()` given that concurrent callers may try to access the + // object. q.currentObjectLock.Lock() - defer q.currentObjectLock.Unlock() + currentObject := q.currentObject + q.currentObjectLock.Unlock() - if q.currentObject != nil { + if currentObject != nil { // If the current object is still dirty, then we must not try to read a new object. - if q.currentObject.isDirty() { + if currentObject.isDirty() { return nil, fmt.Errorf("current object has not been fully read") } - q.currentObject.close() + currentObject.close() + + q.currentObjectLock.Lock() q.currentObject = nil + q.currentObjectLock.Unlock() // If we have already read an object before, then we must consume the trailing // newline after the object's data. @@ -152,6 +184,10 @@ func (q *requestQueue) ReadObject() (*Object, error) { } q.trace.recordRequest(objectInfo.Type) + // Synchronize with readers of the object. + q.currentObjectLock.Lock() + defer q.currentObjectLock.Unlock() + q.currentObject = &Object{ ObjectInfo: *objectInfo, dataReader: io.LimitedReader{ |