diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/git/catfile/object_content_reader.go | 12 | ||||
-rw-r--r-- | internal/git/catfile/object_content_reader_test.go | 78 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader.go | 12 | ||||
-rw-r--r-- | internal/git/catfile/object_info_reader_test.go | 72 | ||||
-rw-r--r-- | internal/git/catfile/request_queue.go | 41 | ||||
-rw-r--r-- | internal/git/catfile/request_queue_test.go | 121 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_info.go | 10 | ||||
-rw-r--r-- | internal/git/gitpipe/catfile_object.go | 10 |
8 files changed, 206 insertions, 150 deletions
diff --git a/internal/git/catfile/object_content_reader.go b/internal/git/catfile/object_content_reader.go index 3a2d59ad5..620e857fc 100644 --- a/internal/git/catfile/object_content_reader.go +++ b/internal/git/catfile/object_content_reader.go @@ -34,12 +34,12 @@ type ObjectContentReader interface { // have been queued up such that all requested objects will be readable. type ObjectContentQueue interface { // RequestObject requests the given revision from git-cat-file(1). - RequestObject(git.Revision) error + RequestObject(context.Context, git.Revision) error // ReadObject reads an object which has previously been requested. - ReadObject() (*Object, error) + ReadObject(context.Context) (*Object, error) // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which // have been requested up to this point. - Flush() error + Flush(context.Context) error } // objectContentReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file @@ -126,15 +126,15 @@ func (o *objectContentReader) Object(ctx context.Context, revision git.Revision) } defer finish() - if err := queue.RequestObject(revision); err != nil { + if err := queue.RequestObject(ctx, revision); err != nil { return nil, err } - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { return nil, err } - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) if err != nil { return nil, err } diff --git a/internal/git/catfile/object_content_reader_test.go b/internal/git/catfile/object_content_reader_test.go index ba8ac1af0..f7be0dfb4 100644 --- a/internal/git/catfile/object_content_reader_test.go +++ b/internal/git/catfile/object_content_reader_test.go @@ -148,10 +148,10 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -171,10 +171,10 @@ func TestObjectContentReader_queue(t *testing.T) { foobarBlob: "foobar", barfooBlob: "barfoo", } { - require.NoError(t, queue.RequestObject(blobID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, blobID.Revision())) + require.NoError(t, queue.Flush(ctx)) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -191,12 +191,12 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.RequestObject(barfooBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.RequestObject(ctx, barfooBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) for _, expectedContents := range []string{"foobar", "barfoo"} { - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -213,7 +213,7 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, errors.New("no outstanding request"), err) }) @@ -227,11 +227,11 @@ func TestObjectContentReader_queue(t *testing.T) { // We flush once before and once after requesting the object such that we can be // sure that it doesn't impact which objects we can read. - require.NoError(t, queue.Flush()) - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -248,12 +248,12 @@ func TestObjectContentReader_queue(t *testing.T) { defer cleanup() for i := 0; i < 10; i++ { - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) } - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) for i := 0; i < 10; i++ { - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -270,9 +270,9 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, errors.New("no outstanding request"), err) }) @@ -284,10 +284,10 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject("does-not-exist")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, "does-not-exist")) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, NotFoundError{errors.New("object not found")}, err) }) @@ -299,17 +299,17 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject("does-not-exist")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, "does-not-exist")) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object after the previous one has failed should continue to // work alright. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - object, err := queue.ReadObject() + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) + object, err := queue.ReadObject(ctx) require.NoError(t, err) contents, err := io.ReadAll(object) @@ -347,13 +347,13 @@ func TestObjectContentReader_queue(t *testing.T) { require.False(t, reader.isDirty()) require.False(t, queue.isDirty()) - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) // The object has not been consumed yet, so the reader must still be dirty. @@ -380,7 +380,7 @@ func TestObjectContentReader_queue(t *testing.T) { require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision())) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, foobarBlob.Revision())) }) t.Run("closing queue blocks read", func(t *testing.T) { @@ -392,15 +392,15 @@ func TestObjectContentReader_queue(t *testing.T) { defer cleanup() // Request the object before we close the queue. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) queue.close() require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err) }) @@ -412,11 +412,11 @@ func TestObjectContentReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision())) + require.NoError(t, queue.Flush(ctx)) // Read the object header before closing. - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) queue.close() diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index c13fa6d1f..8099a9345 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -114,12 +114,12 @@ type ObjectInfoReader interface { // all requests have been queued up such that all requested objects will be readable. type ObjectInfoQueue interface { // RequestInfo requests the given revision from git-cat-file(1). - RequestInfo(git.Revision) error + RequestInfo(context.Context, git.Revision) error // ReadInfo reads object info which has previously been requested. - ReadInfo() (*ObjectInfo, error) + ReadInfo(context.Context) (*ObjectInfo, error) // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which // have been requested up to this point. - Flush() error + Flush(context.Context) error } // objectInfoReader is a reader for Git object information. This reader is implemented via a @@ -209,15 +209,15 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob } defer cleanup() - if err := queue.RequestInfo(revision); err != nil { + if err := queue.RequestInfo(ctx, revision); err != nil { return nil, err } - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { return nil, err } - objectInfo, err := queue.ReadInfo() + objectInfo, err := queue.ReadInfo(ctx) if err != nil { return nil, err } diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index 99d088fa3..d84dafcb2 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -243,10 +243,10 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.Flush(ctx)) - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &blobInfo, info) }) @@ -263,10 +263,10 @@ func TestObjectInfoReader_queue(t *testing.T) { blobOID: blobInfo, commitOID: commitInfo, } { - require.NoError(t, queue.RequestInfo(oid.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, oid.Revision())) + require.NoError(t, queue.Flush(ctx)) - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &objectInfo, info) } @@ -280,12 +280,12 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.RequestInfo(commitOID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.RequestInfo(ctx, commitOID.Revision())) + require.NoError(t, queue.Flush(ctx)) for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} { - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &expectedInfo, info) } @@ -299,7 +299,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.Equal(t, errors.New("no outstanding request"), err) }) @@ -313,11 +313,11 @@ func TestObjectInfoReader_queue(t *testing.T) { // We flush once before and once after requesting the object such that we can be // sure that it doesn't impact which objects we can read. - require.NoError(t, queue.Flush()) - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.Flush(ctx)) - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &blobInfo, info) }) @@ -331,12 +331,12 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() for i := 0; i < 10; i++ { - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) } - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) for i := 0; i < 10; i++ { - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &blobInfo, info) } @@ -350,9 +350,9 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.Flush()) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.Equal(t, errors.New("no outstanding request"), err) }) @@ -364,10 +364,10 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo("does-not-exist")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, "does-not-exist")) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.Equal(t, NotFoundError{errors.New("object not found")}, err) }) @@ -379,17 +379,17 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo("does-not-exist")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, "does-not-exist")) + require.NoError(t, queue.Flush(ctx)) - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.Equal(t, NotFoundError{errors.New("object not found")}, err) // Requesting another object info after the previous one has failed should continue // to work alright. - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.Flush()) - info, err := queue.ReadInfo() + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.Flush(ctx)) + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.Equal(t, &blobInfo, info) }) @@ -424,13 +424,13 @@ func TestObjectInfoReader_queue(t *testing.T) { require.False(t, reader.isDirty()) require.False(t, queue.isDirty()) - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.Flush(ctx)) require.True(t, reader.isDirty()) require.True(t, queue.isDirty()) - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.NoError(t, err) require.False(t, reader.isDirty()) @@ -450,7 +450,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(blobOID.Revision())) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, blobOID.Revision())) }) t.Run("closing queue blocks read", func(t *testing.T) { @@ -462,15 +462,15 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() // Request the object before we close the queue. - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision())) + require.NoError(t, queue.Flush(ctx)) queue.close() require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - _, err = queue.ReadInfo() + _, err = queue.ReadInfo(ctx) require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err) }) } diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index 77d7df647..b0ae8c1ba 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -2,11 +2,14 @@ package catfile import ( "bufio" + "context" "fmt" "io" "os" "sync/atomic" + "time" + "gitlab.com/gitlab-org/gitaly/v15/internal/command" "gitlab.com/gitlab-org/gitaly/v15/internal/git" ) @@ -87,17 +90,21 @@ func (q *requestQueue) close() { // RequestObject requests the contents for the given revision. A subsequent call has // to be made to ReadObject to read the contents. -func (q *requestQueue) RequestObject(revision git.Revision) error { - return q.requestRevision(contentsCommand, revision) +func (q *requestQueue) RequestObject(ctx context.Context, revision git.Revision) error { + defer logDuration(ctx, "request_object")() + + return q.requestRevision(ctx, contentsCommand, revision) } // RequestObject requests the info for the given revision. A subsequent call has to // be made to ReadInfo read the info. -func (q *requestQueue) RequestInfo(revision git.Revision) error { - return q.requestRevision(infoCommand, revision) +func (q *requestQueue) RequestInfo(ctx context.Context, revision git.Revision) error { + defer logDuration(ctx, "request_info")() + + return q.requestRevision(ctx, infoCommand, revision) } -func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error { +func (q *requestQueue) requestRevision(ctx context.Context, cmd string, revision git.Revision) error { if q.isClosed() { return fmt.Errorf("cannot request revision: %w", os.ErrClosed) } @@ -117,7 +124,9 @@ func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error return nil } -func (q *requestQueue) Flush() error { +func (q *requestQueue) Flush(ctx context.Context) error { + defer logDuration(ctx, "flush")() + if q.isClosed() { return fmt.Errorf("cannot flush: %w", os.ErrClosed) } @@ -141,7 +150,9 @@ type readerFunc func([]byte) (int, error) func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) } -func (q *requestQueue) ReadObject() (*Object, error) { +func (q *requestQueue) ReadObject(ctx context.Context) (*Object, error) { + defer logDuration(ctx, "read_object")() + if !q.isObjectQueue { panic("object queue used to read object info") } @@ -207,7 +218,9 @@ func (q *requestQueue) ReadObject() (*Object, error) { }, nil } -func (q *requestQueue) ReadInfo() (*ObjectInfo, error) { +func (q *requestQueue) ReadInfo(ctx context.Context) (*ObjectInfo, error) { + defer logDuration(ctx, "read_info")() + if q.isObjectQueue { panic("object queue used to read object info") } @@ -243,3 +256,15 @@ func (q *requestQueue) readInfo() (*ObjectInfo, error) { return ParseObjectInfo(q.objectHash, q.stdout) } + +func logDuration(ctx context.Context, logFieldName string) func() { + start := time.Now() + return func() { + delta := time.Since(start) + if stats := command.StatsFromContext(ctx); stats != nil { + stats.RecordSum(fmt.Sprintf("catfile.%s_count", logFieldName), 1) + stats.RecordSum(fmt.Sprintf("catfile.%s_ms", logFieldName), int(delta.Milliseconds())) + stats.RecordSum("catfile.duration_ms", int(delta.Milliseconds())) + } + } +} diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go index bd8cd5b59..8e2249493 100644 --- a/internal/git/catfile/request_queue_test.go +++ b/internal/git/catfile/request_queue_test.go @@ -10,6 +10,7 @@ import ( "unsafe" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/command" "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" @@ -27,26 +28,26 @@ func TestRequestQueue_ReadObject(t *testing.T) { _, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n") require.PanicsWithValue(t, "object queue used to read object info", func() { - _, _ = queue.ReadInfo() + _, _ = queue.ReadInfo(ctx) }) }) t.Run("read without request", func(t *testing.T) { _, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n") - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("no outstanding request"), err) }) t.Run("read on closed reader", func(t *testing.T) { reader, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n") - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) require.True(t, queue.isDirty()) reader.close() require.True(t, queue.isDirty()) - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("cannot read object info: %w", fmt.Errorf("file already closed")), err) }) @@ -56,15 +57,15 @@ func TestRequestQueue_ReadObject(t *testing.T) { `, oid)) // We queue two revisions... - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) // .. and only unqueue one object. This object isn't read though, ... - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.NoError(t, err) // ... which means that trying to read the second object should fail now. - _, err = queue.ReadObject() + _, err = queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("current object has not been fully read"), err) require.True(t, queue.isDirty()) @@ -75,9 +76,9 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "something something" `) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("invalid info line: %q", "something something"), err) // The queue must be dirty when we failed due to an unexpected error. @@ -89,9 +90,9 @@ func TestRequestQueue_ReadObject(t *testing.T) { exit 1 `) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.Equal(t, fmt.Errorf("read info line: %w", io.EOF), err) // The queue must be dirty when we failed due to an unexpected error. @@ -103,9 +104,9 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "%s missing" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) - _, err := queue.ReadObject() + _, err := queue.ReadObject(ctx) require.Equal(t, NotFoundError{error: fmt.Errorf("object not found")}, err) // The queue must be empty even if the object wasn't found: this is a graceful @@ -119,10 +120,10 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "1234567890" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) require.True(t, queue.isDirty()) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) require.Equal(t, ObjectInfo{ Oid: oid, @@ -147,8 +148,8 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "0987654321" `, oid, secondOID)) - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) require.True(t, queue.isDirty()) for _, expectedObject := range []struct { @@ -174,7 +175,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { } { require.True(t, queue.isDirty()) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) require.Equal(t, expectedObject.info, object.ObjectInfo) @@ -192,10 +193,10 @@ func TestRequestQueue_ReadObject(t *testing.T) { printf "123" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestObject(ctx, "foo")) require.True(t, queue.isDirty()) - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) require.Equal(t, ObjectInfo{ Oid: oid, @@ -228,7 +229,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())) requireRevision := func(t *testing.T, queue *requestQueue, rev git.Revision) { - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) require.NoError(t, err) data, err := io.ReadAll(object) @@ -240,7 +241,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { _, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh") queue.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, "foo")) }) t.Run("requesting revision on closed process", func(t *testing.T) { @@ -248,7 +249,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { process.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, "foo")) }) t.Run("single request", func(t *testing.T) { @@ -258,8 +259,8 @@ func TestRequestQueue_RequestObject(t *testing.T) { echo "${revision}" `, oid)) - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, "foo")) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue, "foo") }) @@ -273,11 +274,11 @@ func TestRequestQueue_RequestObject(t *testing.T) { done `, oid)) - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("bar")) - require.NoError(t, queue.RequestObject("baz")) - require.NoError(t, queue.RequestObject("qux")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, "foo")) + require.NoError(t, queue.RequestObject(ctx, "bar")) + require.NoError(t, queue.RequestObject(ctx, "baz")) + require.NoError(t, queue.RequestObject(ctx, "qux")) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue, "foo") requireRevision(t, queue, "bar") @@ -307,8 +308,8 @@ func TestRequestQueue_RequestObject(t *testing.T) { "foo", "qux", } { - require.NoError(t, queue.RequestObject(revision)) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestObject(ctx, revision)) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue, revision) } }) @@ -323,7 +324,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) { expectedInfo := &ObjectInfo{oid, "blob", 955} requireRevision := func(t *testing.T, queue *requestQueue) { - info, err := queue.ReadInfo() + info, err := queue.ReadInfo(ctx) require.NoError(t, err) require.NoError(t, err) @@ -334,7 +335,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) { _, queue := newInterceptedInfoQueue(t, ctx, "#!/bin/sh") queue.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, "foo")) }) t.Run("requesting revision on closed process", func(t *testing.T) { @@ -342,7 +343,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) { process.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, "foo")) }) t.Run("single request", func(t *testing.T) { @@ -351,8 +352,8 @@ func TestRequestQueue_RequestInfo(t *testing.T) { echo "%s blob 955" `, oid)) - require.NoError(t, queue.RequestInfo("foo")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, "foo")) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue) }) @@ -365,11 +366,11 @@ func TestRequestQueue_RequestInfo(t *testing.T) { done `, oid)) - require.NoError(t, queue.RequestInfo("foo")) - require.NoError(t, queue.RequestInfo("bar")) - require.NoError(t, queue.RequestInfo("baz")) - require.NoError(t, queue.RequestInfo("qux")) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, "foo")) + require.NoError(t, queue.RequestInfo(ctx, "bar")) + require.NoError(t, queue.RequestInfo(ctx, "baz")) + require.NoError(t, queue.RequestInfo(ctx, "qux")) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue) requireRevision(t, queue) @@ -398,13 +399,43 @@ func TestRequestQueue_RequestInfo(t *testing.T) { "foo", "qux", } { - require.NoError(t, queue.RequestInfo(revision)) - require.NoError(t, queue.Flush()) + require.NoError(t, queue.RequestInfo(ctx, revision)) + require.NoError(t, queue.Flush(ctx)) requireRevision(t, queue) } }) } +func TestRequestQueue_CommandStats(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + ctx = command.InitContextStats(ctx) + + oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())) + + _, queue := newInterceptedObjectQueue(t, ctx, fmt.Sprintf(`#!/bin/sh + read revision + echo "%s blob ${#revision}" + echo "${revision}" + `, oid)) + + require.NoError(t, queue.RequestObject(ctx, "foo")) + require.NoError(t, queue.Flush(ctx)) + _, err := queue.ReadObject(ctx) + require.NoError(t, err) + + stats := command.StatsFromContext(ctx) + fields := stats.Fields() + require.Contains(t, fields, "catfile.request_object_count") + require.Contains(t, fields, "catfile.request_object_ms") + require.Contains(t, fields, "catfile.flush_count") + require.Contains(t, fields, "catfile.flush_ms") + require.Contains(t, fields, "catfile.read_object_count") + require.Contains(t, fields, "catfile.read_object_ms") + require.Contains(t, fields, "catfile.duration_ms") +} + func TestRequestQueueCounters64BitAlignment(t *testing.T) { require.Equal(t, 0, int(unsafe.Sizeof(requestQueue{}.counters))%8) } diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 24465f4c8..af78985d0 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -88,7 +88,7 @@ func CatfileInfo( var i int64 for it.Next() { - if err := queue.RequestInfo(it.ObjectID().Revision()); err != nil { + if err := queue.RequestInfo(ctx, it.ObjectID().Revision()); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } @@ -100,7 +100,7 @@ func CatfileInfo( // If the context got cancelled, then we need to flush out all // outstanding requests so that the downstream consumer is // unblocked. - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } @@ -111,7 +111,7 @@ func CatfileInfo( i++ if i%int64(cap(requestChan)) == 0 { - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } @@ -123,7 +123,7 @@ func CatfileInfo( return } - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } @@ -147,7 +147,7 @@ func CatfileInfo( break } - objectInfo, err := queue.ReadInfo() + objectInfo, err := queue.ReadInfo(ctx) if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ err: fmt.Errorf("retrieving object info for %q: %w", request.objectID, err), diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 1590661cd..0f70423b2 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -72,7 +72,7 @@ func CatfileObject( var i int64 for it.Next() { - if err := queue.RequestObject(it.ObjectID().Revision()); err != nil { + if err := queue.RequestObject(ctx, it.ObjectID().Revision()); err != nil { sendRequest(catfileObjectRequest{err: err}) return } @@ -83,7 +83,7 @@ func CatfileObject( // If the context got cancelled, then we need to flush out all // outstanding requests so that the downstream consumer is // unblocked. - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendRequest(catfileObjectRequest{err: err}) return } @@ -94,7 +94,7 @@ func CatfileObject( i++ if i%int64(cap(requestChan)) == 0 { - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendRequest(catfileObjectRequest{err: err}) return } @@ -106,7 +106,7 @@ func CatfileObject( return } - if err := queue.Flush(); err != nil { + if err := queue.Flush(ctx); err != nil { sendRequest(catfileObjectRequest{err: err}) return } @@ -165,7 +165,7 @@ func CatfileObject( } } - object, err := queue.ReadObject() + object, err := queue.ReadObject(ctx) if err != nil { sendResult(CatfileObjectResult{ err: fmt.Errorf("requesting object: %w", err), |