Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/git/catfile/request_queue.go44
1 files changed, 40 insertions, 4 deletions
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index d41ed38dc..a31a8b029 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -33,6 +33,10 @@ type requestQueue struct {
// closed indicates whether the queue is closed for additional requests.
closed int32
+ // isReadingObject indicates whether the queue is currently in `ReadObject()` to
+ // read in a new object's header.
+ isReadingObject int32
+
// isObjectQueue is set to `true` when this is a request queue which can be used for reading
// objects. If set to `false`, then this can only be used to read object info.
isObjectQueue bool
@@ -51,6 +55,8 @@ type requestQueue struct {
// isDirty returns true either if there are outstanding requests for objects or if the current
// object hasn't yet been fully consumed.
func (q *requestQueue) isDirty() bool {
+ // We need to take this lock early. Like this we can ensure that `ReadObject()`
+ // cannot have it at the same time while `isReadingObject == 0`.
q.currentObjectLock.Lock()
defer q.currentObjectLock.Unlock()
@@ -60,6 +66,10 @@ func (q *requestQueue) isDirty() bool {
return true
}
+ if atomic.LoadInt32(&q.isReadingObject) != 0 {
+ return true
+ }
+
if atomic.LoadInt64(&q.outstandingRequests) != 0 {
return true
}
@@ -127,17 +137,39 @@ func (q *requestQueue) ReadObject() (*Object, error) {
panic("object queue used to read object info")
}
+ // We need to ensure that only a single call to `ReadObject()` can happen at the
+ // same point in time. While the easy solution would be to just lock
+ // `currentObjectLock()`, that would mean we hold the lock while performing I/O
+ // operations. That's a no-go though because the command may stall, and would
+ // deadlock with `isDirty()` and `close()`.
+ //
+ // Note that this must happen before we read the current object: otherwise, a
+ // concurrent caller might've already swapped it out under our feet.
+ if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) {
+ return nil, fmt.Errorf("concurrent read of the object")
+ }
+ defer func() {
+ atomic.StoreInt32(&q.isReadingObject, 0)
+ }()
+
+ // We know no other user can change `currentObject` right now anymore, but we still
+ // need to take `currentObjectLock()` given that concurrent callers may try to access the
+ // object.
q.currentObjectLock.Lock()
- defer q.currentObjectLock.Unlock()
+ currentObject := q.currentObject
+ q.currentObjectLock.Unlock()
- if q.currentObject != nil {
+ if currentObject != nil {
// If the current object is still dirty, then we must not try to read a new object.
- if q.currentObject.isDirty() {
+ if currentObject.isDirty() {
return nil, fmt.Errorf("current object has not been fully read")
}
- q.currentObject.close()
+ currentObject.close()
+
+ q.currentObjectLock.Lock()
q.currentObject = nil
+ q.currentObjectLock.Unlock()
// If we have already read an object before, then we must consume the trailing
// newline after the object's data.
@@ -152,6 +184,10 @@ func (q *requestQueue) ReadObject() (*Object, error) {
}
q.trace.recordRequest(objectInfo.Type)
+ // Synchronize with readers of the object.
+ q.currentObjectLock.Lock()
+ defer q.currentObjectLock.Unlock()
+
q.currentObject = &Object{
ObjectInfo: *objectInfo,
dataReader: io.LimitedReader{