Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/git/catfile/object_content_reader.go12
-rw-r--r--internal/git/catfile/object_content_reader_test.go78
-rw-r--r--internal/git/catfile/object_info_reader.go12
-rw-r--r--internal/git/catfile/object_info_reader_test.go72
-rw-r--r--internal/git/catfile/request_queue.go41
-rw-r--r--internal/git/catfile/request_queue_test.go121
-rw-r--r--internal/git/gitpipe/catfile_info.go10
-rw-r--r--internal/git/gitpipe/catfile_object.go10
8 files changed, 206 insertions, 150 deletions
diff --git a/internal/git/catfile/object_content_reader.go b/internal/git/catfile/object_content_reader.go
index 3a2d59ad5..620e857fc 100644
--- a/internal/git/catfile/object_content_reader.go
+++ b/internal/git/catfile/object_content_reader.go
@@ -34,12 +34,12 @@ type ObjectContentReader interface {
// have been queued up such that all requested objects will be readable.
type ObjectContentQueue interface {
// RequestObject requests the given revision from git-cat-file(1).
- RequestObject(git.Revision) error
+ RequestObject(context.Context, git.Revision) error
// ReadObject reads an object which has previously been requested.
- ReadObject() (*Object, error)
+ ReadObject(context.Context) (*Object, error)
// Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
// have been requested up to this point.
- Flush() error
+ Flush(context.Context) error
}
// objectContentReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
@@ -126,15 +126,15 @@ func (o *objectContentReader) Object(ctx context.Context, revision git.Revision)
}
defer finish()
- if err := queue.RequestObject(revision); err != nil {
+ if err := queue.RequestObject(ctx, revision); err != nil {
return nil, err
}
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
return nil, err
}
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
if err != nil {
return nil, err
}
diff --git a/internal/git/catfile/object_content_reader_test.go b/internal/git/catfile/object_content_reader_test.go
index ba8ac1af0..f7be0dfb4 100644
--- a/internal/git/catfile/object_content_reader_test.go
+++ b/internal/git/catfile/object_content_reader_test.go
@@ -148,10 +148,10 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -171,10 +171,10 @@ func TestObjectContentReader_queue(t *testing.T) {
foobarBlob: "foobar",
barfooBlob: "barfoo",
} {
- require.NoError(t, queue.RequestObject(blobID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, blobID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -191,12 +191,12 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.RequestObject(barfooBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.RequestObject(ctx, barfooBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
for _, expectedContents := range []string{"foobar", "barfoo"} {
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -213,7 +213,7 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, errors.New("no outstanding request"), err)
})
@@ -227,11 +227,11 @@ func TestObjectContentReader_queue(t *testing.T) {
// We flush once before and once after requesting the object such that we can be
// sure that it doesn't impact which objects we can read.
- require.NoError(t, queue.Flush())
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -248,12 +248,12 @@ func TestObjectContentReader_queue(t *testing.T) {
defer cleanup()
for i := 0; i < 10; i++ {
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
}
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
for i := 0; i < 10; i++ {
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -270,9 +270,9 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, errors.New("no outstanding request"), err)
})
@@ -284,10 +284,10 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject("does-not-exist"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, "does-not-exist"))
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
})
@@ -299,17 +299,17 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject("does-not-exist"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, "does-not-exist"))
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object after the previous one has failed should continue to
// work alright.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
- object, err := queue.ReadObject()
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
contents, err := io.ReadAll(object)
@@ -347,13 +347,13 @@ func TestObjectContentReader_queue(t *testing.T) {
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
// The object has not been consumed yet, so the reader must still be dirty.
@@ -380,7 +380,7 @@ func TestObjectContentReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision()))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, foobarBlob.Revision()))
})
t.Run("closing queue blocks read", func(t *testing.T) {
@@ -392,15 +392,15 @@ func TestObjectContentReader_queue(t *testing.T) {
defer cleanup()
// Request the object before we close the queue.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
queue.close()
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err)
})
@@ -412,11 +412,11 @@ func TestObjectContentReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, foobarBlob.Revision()))
+ require.NoError(t, queue.Flush(ctx))
// Read the object header before closing.
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
queue.close()
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index c13fa6d1f..8099a9345 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -114,12 +114,12 @@ type ObjectInfoReader interface {
// all requests have been queued up such that all requested objects will be readable.
type ObjectInfoQueue interface {
// RequestInfo requests the given revision from git-cat-file(1).
- RequestInfo(git.Revision) error
+ RequestInfo(context.Context, git.Revision) error
// ReadInfo reads object info which has previously been requested.
- ReadInfo() (*ObjectInfo, error)
+ ReadInfo(context.Context) (*ObjectInfo, error)
// Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
// have been requested up to this point.
- Flush() error
+ Flush(context.Context) error
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
@@ -209,15 +209,15 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob
}
defer cleanup()
- if err := queue.RequestInfo(revision); err != nil {
+ if err := queue.RequestInfo(ctx, revision); err != nil {
return nil, err
}
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
return nil, err
}
- objectInfo, err := queue.ReadInfo()
+ objectInfo, err := queue.ReadInfo(ctx)
if err != nil {
return nil, err
}
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index 99d088fa3..d84dafcb2 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -243,10 +243,10 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
})
@@ -263,10 +263,10 @@ func TestObjectInfoReader_queue(t *testing.T) {
blobOID: blobInfo,
commitOID: commitInfo,
} {
- require.NoError(t, queue.RequestInfo(oid.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, oid.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &objectInfo, info)
}
@@ -280,12 +280,12 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.RequestInfo(commitOID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.RequestInfo(ctx, commitOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &expectedInfo, info)
}
@@ -299,7 +299,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.Equal(t, errors.New("no outstanding request"), err)
})
@@ -313,11 +313,11 @@ func TestObjectInfoReader_queue(t *testing.T) {
// We flush once before and once after requesting the object such that we can be
// sure that it doesn't impact which objects we can read.
- require.NoError(t, queue.Flush())
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
})
@@ -331,12 +331,12 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
for i := 0; i < 10; i++ {
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
}
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
for i := 0; i < 10; i++ {
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
}
@@ -350,9 +350,9 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.Equal(t, errors.New("no outstanding request"), err)
})
@@ -364,10 +364,10 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo("does-not-exist"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, "does-not-exist"))
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
})
@@ -379,17 +379,17 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo("does-not-exist"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, "does-not-exist"))
+ require.NoError(t, queue.Flush(ctx))
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.Equal(t, NotFoundError{errors.New("object not found")}, err)
// Requesting another object info after the previous one has failed should continue
// to work alright.
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.Flush())
- info, err := queue.ReadInfo()
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.Equal(t, &blobInfo, info)
})
@@ -424,13 +424,13 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
require.True(t, reader.isDirty())
require.True(t, queue.isDirty())
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.NoError(t, err)
require.False(t, reader.isDirty())
@@ -450,7 +450,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(blobOID.Revision()))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, blobOID.Revision()))
})
t.Run("closing queue blocks read", func(t *testing.T) {
@@ -462,15 +462,15 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
// Request the object before we close the queue.
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, blobOID.Revision()))
+ require.NoError(t, queue.Flush(ctx))
queue.close()
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- _, err = queue.ReadInfo()
+ _, err = queue.ReadInfo(ctx)
require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err)
})
}
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index 77d7df647..b0ae8c1ba 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -2,11 +2,14 @@ package catfile
import (
"bufio"
+ "context"
"fmt"
"io"
"os"
"sync/atomic"
+ "time"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/command"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
)
@@ -87,17 +90,21 @@ func (q *requestQueue) close() {
// RequestObject requests the contents for the given revision. A subsequent call has
// to be made to ReadObject to read the contents.
-func (q *requestQueue) RequestObject(revision git.Revision) error {
- return q.requestRevision(contentsCommand, revision)
+func (q *requestQueue) RequestObject(ctx context.Context, revision git.Revision) error {
+ defer logDuration(ctx, "request_object")()
+
+ return q.requestRevision(ctx, contentsCommand, revision)
}
// RequestObject requests the info for the given revision. A subsequent call has to
// be made to ReadInfo read the info.
-func (q *requestQueue) RequestInfo(revision git.Revision) error {
- return q.requestRevision(infoCommand, revision)
+func (q *requestQueue) RequestInfo(ctx context.Context, revision git.Revision) error {
+ defer logDuration(ctx, "request_info")()
+
+ return q.requestRevision(ctx, infoCommand, revision)
}
-func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error {
+func (q *requestQueue) requestRevision(ctx context.Context, cmd string, revision git.Revision) error {
if q.isClosed() {
return fmt.Errorf("cannot request revision: %w", os.ErrClosed)
}
@@ -117,7 +124,9 @@ func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error
return nil
}
-func (q *requestQueue) Flush() error {
+func (q *requestQueue) Flush(ctx context.Context) error {
+ defer logDuration(ctx, "flush")()
+
if q.isClosed() {
return fmt.Errorf("cannot flush: %w", os.ErrClosed)
}
@@ -141,7 +150,9 @@ type readerFunc func([]byte) (int, error)
func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) }
-func (q *requestQueue) ReadObject() (*Object, error) {
+func (q *requestQueue) ReadObject(ctx context.Context) (*Object, error) {
+ defer logDuration(ctx, "read_object")()
+
if !q.isObjectQueue {
panic("object queue used to read object info")
}
@@ -207,7 +218,9 @@ func (q *requestQueue) ReadObject() (*Object, error) {
}, nil
}
-func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {
+func (q *requestQueue) ReadInfo(ctx context.Context) (*ObjectInfo, error) {
+ defer logDuration(ctx, "read_info")()
+
if q.isObjectQueue {
panic("object queue used to read object info")
}
@@ -243,3 +256,15 @@ func (q *requestQueue) readInfo() (*ObjectInfo, error) {
return ParseObjectInfo(q.objectHash, q.stdout)
}
+
+func logDuration(ctx context.Context, logFieldName string) func() {
+ start := time.Now()
+ return func() {
+ delta := time.Since(start)
+ if stats := command.StatsFromContext(ctx); stats != nil {
+ stats.RecordSum(fmt.Sprintf("catfile.%s_count", logFieldName), 1)
+ stats.RecordSum(fmt.Sprintf("catfile.%s_ms", logFieldName), int(delta.Milliseconds()))
+ stats.RecordSum("catfile.duration_ms", int(delta.Milliseconds()))
+ }
+ }
+}
diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go
index bd8cd5b59..8e2249493 100644
--- a/internal/git/catfile/request_queue_test.go
+++ b/internal/git/catfile/request_queue_test.go
@@ -10,6 +10,7 @@ import (
"unsafe"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/command"
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
@@ -27,26 +28,26 @@ func TestRequestQueue_ReadObject(t *testing.T) {
_, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n")
require.PanicsWithValue(t, "object queue used to read object info", func() {
- _, _ = queue.ReadInfo()
+ _, _ = queue.ReadInfo(ctx)
})
})
t.Run("read without request", func(t *testing.T) {
_, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n")
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("no outstanding request"), err)
})
t.Run("read on closed reader", func(t *testing.T) {
reader, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh\nread\n")
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
require.True(t, queue.isDirty())
reader.close()
require.True(t, queue.isDirty())
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("cannot read object info: %w", fmt.Errorf("file already closed")), err)
})
@@ -56,15 +57,15 @@ func TestRequestQueue_ReadObject(t *testing.T) {
`, oid))
// We queue two revisions...
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
// .. and only unqueue one object. This object isn't read though, ...
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.NoError(t, err)
// ... which means that trying to read the second object should fail now.
- _, err = queue.ReadObject()
+ _, err = queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("current object has not been fully read"), err)
require.True(t, queue.isDirty())
@@ -75,9 +76,9 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "something something"
`)
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("invalid info line: %q", "something something"), err)
// The queue must be dirty when we failed due to an unexpected error.
@@ -89,9 +90,9 @@ func TestRequestQueue_ReadObject(t *testing.T) {
exit 1
`)
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.Equal(t, fmt.Errorf("read info line: %w", io.EOF), err)
// The queue must be dirty when we failed due to an unexpected error.
@@ -103,9 +104,9 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "%s missing"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
- _, err := queue.ReadObject()
+ _, err := queue.ReadObject(ctx)
require.Equal(t, NotFoundError{error: fmt.Errorf("object not found")}, err)
// The queue must be empty even if the object wasn't found: this is a graceful
@@ -119,10 +120,10 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "1234567890"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
require.True(t, queue.isDirty())
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
require.Equal(t, ObjectInfo{
Oid: oid,
@@ -147,8 +148,8 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "0987654321"
`, oid, secondOID))
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
require.True(t, queue.isDirty())
for _, expectedObject := range []struct {
@@ -174,7 +175,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
} {
require.True(t, queue.isDirty())
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
require.Equal(t, expectedObject.info, object.ObjectInfo)
@@ -192,10 +193,10 @@ func TestRequestQueue_ReadObject(t *testing.T) {
printf "123"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
require.True(t, queue.isDirty())
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
require.Equal(t, ObjectInfo{
Oid: oid,
@@ -228,7 +229,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen()))
requireRevision := func(t *testing.T, queue *requestQueue, rev git.Revision) {
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
require.NoError(t, err)
data, err := io.ReadAll(object)
@@ -240,7 +241,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
_, queue := newInterceptedObjectQueue(t, ctx, "#!/bin/sh")
queue.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, "foo"))
})
t.Run("requesting revision on closed process", func(t *testing.T) {
@@ -248,7 +249,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
process.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(ctx, "foo"))
})
t.Run("single request", func(t *testing.T) {
@@ -258,8 +259,8 @@ func TestRequestQueue_RequestObject(t *testing.T) {
echo "${revision}"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue, "foo")
})
@@ -273,11 +274,11 @@ func TestRequestQueue_RequestObject(t *testing.T) {
done
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("bar"))
- require.NoError(t, queue.RequestObject("baz"))
- require.NoError(t, queue.RequestObject("qux"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
+ require.NoError(t, queue.RequestObject(ctx, "bar"))
+ require.NoError(t, queue.RequestObject(ctx, "baz"))
+ require.NoError(t, queue.RequestObject(ctx, "qux"))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue, "foo")
requireRevision(t, queue, "bar")
@@ -307,8 +308,8 @@ func TestRequestQueue_RequestObject(t *testing.T) {
"foo",
"qux",
} {
- require.NoError(t, queue.RequestObject(revision))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestObject(ctx, revision))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue, revision)
}
})
@@ -323,7 +324,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
expectedInfo := &ObjectInfo{oid, "blob", 955}
requireRevision := func(t *testing.T, queue *requestQueue) {
- info, err := queue.ReadInfo()
+ info, err := queue.ReadInfo(ctx)
require.NoError(t, err)
require.NoError(t, err)
@@ -334,7 +335,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
_, queue := newInterceptedInfoQueue(t, ctx, "#!/bin/sh")
queue.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, "foo"))
})
t.Run("requesting revision on closed process", func(t *testing.T) {
@@ -342,7 +343,7 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
process.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(ctx, "foo"))
})
t.Run("single request", func(t *testing.T) {
@@ -351,8 +352,8 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
echo "%s blob 955"
`, oid))
- require.NoError(t, queue.RequestInfo("foo"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, "foo"))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue)
})
@@ -365,11 +366,11 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
done
`, oid))
- require.NoError(t, queue.RequestInfo("foo"))
- require.NoError(t, queue.RequestInfo("bar"))
- require.NoError(t, queue.RequestInfo("baz"))
- require.NoError(t, queue.RequestInfo("qux"))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, "foo"))
+ require.NoError(t, queue.RequestInfo(ctx, "bar"))
+ require.NoError(t, queue.RequestInfo(ctx, "baz"))
+ require.NoError(t, queue.RequestInfo(ctx, "qux"))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue)
requireRevision(t, queue)
@@ -398,13 +399,43 @@ func TestRequestQueue_RequestInfo(t *testing.T) {
"foo",
"qux",
} {
- require.NoError(t, queue.RequestInfo(revision))
- require.NoError(t, queue.Flush())
+ require.NoError(t, queue.RequestInfo(ctx, revision))
+ require.NoError(t, queue.Flush(ctx))
requireRevision(t, queue)
}
})
}
+func TestRequestQueue_CommandStats(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ ctx = command.InitContextStats(ctx)
+
+ oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen()))
+
+ _, queue := newInterceptedObjectQueue(t, ctx, fmt.Sprintf(`#!/bin/sh
+ read revision
+ echo "%s blob ${#revision}"
+ echo "${revision}"
+ `, oid))
+
+ require.NoError(t, queue.RequestObject(ctx, "foo"))
+ require.NoError(t, queue.Flush(ctx))
+ _, err := queue.ReadObject(ctx)
+ require.NoError(t, err)
+
+ stats := command.StatsFromContext(ctx)
+ fields := stats.Fields()
+ require.Contains(t, fields, "catfile.request_object_count")
+ require.Contains(t, fields, "catfile.request_object_ms")
+ require.Contains(t, fields, "catfile.flush_count")
+ require.Contains(t, fields, "catfile.flush_ms")
+ require.Contains(t, fields, "catfile.read_object_count")
+ require.Contains(t, fields, "catfile.read_object_ms")
+ require.Contains(t, fields, "catfile.duration_ms")
+}
+
func TestRequestQueueCounters64BitAlignment(t *testing.T) {
require.Equal(t, 0, int(unsafe.Sizeof(requestQueue{}.counters))%8)
}
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 24465f4c8..af78985d0 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -88,7 +88,7 @@ func CatfileInfo(
var i int64
for it.Next() {
- if err := queue.RequestInfo(it.ObjectID().Revision()); err != nil {
+ if err := queue.RequestInfo(ctx, it.ObjectID().Revision()); err != nil {
sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
return
}
@@ -100,7 +100,7 @@ func CatfileInfo(
// If the context got cancelled, then we need to flush out all
// outstanding requests so that the downstream consumer is
// unblocked.
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
return
}
@@ -111,7 +111,7 @@ func CatfileInfo(
i++
if i%int64(cap(requestChan)) == 0 {
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
return
}
@@ -123,7 +123,7 @@ func CatfileInfo(
return
}
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
return
}
@@ -147,7 +147,7 @@ func CatfileInfo(
break
}
- objectInfo, err := queue.ReadInfo()
+ objectInfo, err := queue.ReadInfo(ctx)
if err != nil {
sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
err: fmt.Errorf("retrieving object info for %q: %w", request.objectID, err),
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 1590661cd..0f70423b2 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -72,7 +72,7 @@ func CatfileObject(
var i int64
for it.Next() {
- if err := queue.RequestObject(it.ObjectID().Revision()); err != nil {
+ if err := queue.RequestObject(ctx, it.ObjectID().Revision()); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
@@ -83,7 +83,7 @@ func CatfileObject(
// If the context got cancelled, then we need to flush out all
// outstanding requests so that the downstream consumer is
// unblocked.
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
@@ -94,7 +94,7 @@ func CatfileObject(
i++
if i%int64(cap(requestChan)) == 0 {
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
@@ -106,7 +106,7 @@ func CatfileObject(
return
}
- if err := queue.Flush(); err != nil {
+ if err := queue.Flush(ctx); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
@@ -165,7 +165,7 @@ func CatfileObject(
}
}
- object, err := queue.ReadObject()
+ object, err := queue.ReadObject(ctx)
if err != nil {
sendResult(CatfileObjectResult{
err: fmt.Errorf("requesting object: %w", err),