diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2022-12-28 11:53:41 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2022-12-28 11:53:41 +0300 |
commit | 4666f8afdd6a005034af3882eb24ecc23e86a670 (patch) | |
tree | 368b03f0907cd2884bfd4ca2016cd78df6dd0560 | |
parent | 1a82d06e41054952e957b97e62325ea2853382ab (diff) | |
parent | d17749651e6dd35d5f3f8de0f8e0a27b704217ff (diff) |
Merge branch 'sh-fix-64bit-atomic-alignment' into 'master'
Fix 64-bit alignment errors with request queue counters
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5223
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Stan Hu <stanhu@gmail.com>
-rw-r--r-- | internal/git/catfile/request_queue.go | 40 | ||||
-rw-r--r-- | internal/git/catfile/request_queue_test.go | 5 |
2 files changed, 28 insertions, 17 deletions
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index e44d9d23f..c020b17c8 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -26,11 +26,7 @@ const ( flushCommand = "\tFLUSH\t" ) -type requestQueue struct { - // objectHash is the object hash used by the repository the request queue has been - // spawned for. - objectHash git.ObjectHash - +type queueCounters struct { // outstandingRequests is the number of requests which have been queued up. Gets incremented // on request, and decremented when starting to read an object (not when that object has // been fully consumed). @@ -44,6 +40,16 @@ type requestQueue struct { // isReadingObject indicates whether there is a read in progress. isReadingObject int32 +} + +type requestQueue struct { + // objectHash is the object hash used by the repository the request queue has been + // spawned for. + objectHash git.ObjectHash + + // queueCounters is a separate structure to hold variables accessed with sync/atomic + // to ensure 64-bit alignment. + counters queueCounters // isObjectQueue is set to `true` when this is a request queue which can be used for reading // objects. If set to `false`, then this can only be used to read object info. @@ -59,11 +65,11 @@ type requestQueue struct { // isDirty returns true either if there are outstanding requests for objects or if the current // object hasn't yet been fully consumed. func (q *requestQueue) isDirty() bool { - if atomic.LoadInt32(&q.isReadingObject) != 0 { + if atomic.LoadInt32(&q.counters.isReadingObject) != 0 { return true } - if atomic.LoadInt64(&q.outstandingRequests) != 0 { + if atomic.LoadInt64(&q.counters.outstandingRequests) != 0 { return true } @@ -71,11 +77,11 @@ func (q *requestQueue) isDirty() bool { } func (q *requestQueue) isClosed() bool { - return atomic.LoadInt32(&q.closed) == 1 + return atomic.LoadInt32(&q.counters.closed) == 1 } func (q *requestQueue) close() { - atomic.StoreInt32(&q.closed, 1) + atomic.StoreInt32(&q.counters.closed, 1) } // RequestObject requests the contents for the given revision. A subsequent call has @@ -95,15 +101,15 @@ func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error return fmt.Errorf("cannot request revision: %w", os.ErrClosed) } - atomic.AddInt64(&q.outstandingRequests, 1) + atomic.AddInt64(&q.counters.outstandingRequests, 1) if _, err := q.stdin.WriteString(revision.String()); err != nil { - atomic.AddInt64(&q.outstandingRequests, -1) + atomic.AddInt64(&q.counters.outstandingRequests, -1) return fmt.Errorf("writing object request: %w", err) } if err := q.stdin.WriteByte('\n'); err != nil { - atomic.AddInt64(&q.outstandingRequests, -1) + atomic.AddInt64(&q.counters.outstandingRequests, -1) return fmt.Errorf("terminating object request: %w", err) } @@ -144,7 +150,7 @@ func (q *requestQueue) ReadObject() (*Object, error) { // // Note that this must happen before we read the current object: otherwise, a // concurrent caller might've already swapped it out under our feet. - if !atomic.CompareAndSwapInt32(&q.isReadingObject, 0, 1) { + if !atomic.CompareAndSwapInt32(&q.counters.isReadingObject, 0, 1) { return nil, fmt.Errorf("current object has not been fully read") } @@ -159,7 +165,7 @@ func (q *requestQueue) ReadObject() (*Object, error) { // One known exception is when we've got a NotFoundError: this is a graceful failure // and we can continue reading from the process. if IsNotFound(err) { - atomic.StoreInt32(&q.isReadingObject, 0) + atomic.StoreInt32(&q.counters.isReadingObject, 0) } return nil, err @@ -179,7 +185,7 @@ func (q *requestQueue) ReadObject() (*Object, error) { return 0, fmt.Errorf("discard newline: %q", err) } - atomic.StoreInt32(&q.isReadingObject, 0) + atomic.StoreInt32(&q.counters.isReadingObject, 0) return 0, io.EOF }), @@ -221,7 +227,7 @@ func (q *requestQueue) readInfo() (*ObjectInfo, error) { // We first need to determine whether there are any queued requests at all. If not, then we // cannot read anything. - queuedRequests := atomic.LoadInt64(&q.outstandingRequests) + queuedRequests := atomic.LoadInt64(&q.counters.outstandingRequests) if queuedRequests == 0 { return nil, fmt.Errorf("no outstanding request") } @@ -230,7 +236,7 @@ func (q *requestQueue) readInfo() (*ObjectInfo, error) { // that there may be a concurrent caller who requests additional objects, which would in // turn increment the counter. But we can at least verify that it's not smaller than what we // expect to give us the chance to detect concurrent reads. - if atomic.AddInt64(&q.outstandingRequests, -1) < queuedRequests-1 { + if atomic.AddInt64(&q.counters.outstandingRequests, -1) < queuedRequests-1 { return nil, fmt.Errorf("concurrent read on request queue") } diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go index 84dbfad41..bd8cd5b59 100644 --- a/internal/git/catfile/request_queue_test.go +++ b/internal/git/catfile/request_queue_test.go @@ -7,6 +7,7 @@ import ( "os" "strings" "testing" + "unsafe" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/git" @@ -404,6 +405,10 @@ func TestRequestQueue_RequestInfo(t *testing.T) { }) } +func TestRequestQueueCounters64BitAlignment(t *testing.T) { + require.Equal(t, 0, int(unsafe.Sizeof(requestQueue{}.counters))%8) +} + func newInterceptedObjectQueue(t *testing.T, ctx context.Context, script string) (ObjectContentReader, *requestQueue) { cfg := testcfg.Build(t) repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ |