diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-11-09 13:20:30 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-11-09 13:20:30 +0300 |
commit | 06ec7a17f320497d13efdc06f7798b919f45fa9d (patch) | |
tree | 474f279c6824a30c878b8994e6f4bcf9b038c3cb | |
parent | 12082c14c38c110bc9c9442017a9f83cd6e80674 (diff) | |
parent | 73b8445fe775fbbb0bc544e069d0555f1e5ddaf3 (diff) |
Merge branch 'pks-catfile-queue' into 'master'
catfile: Introduce request queues to allow batching reads
Closes #3783
See merge request gitlab-org/gitaly!4032
21 files changed, 1046 insertions, 182 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 31c3be38c..97679538f 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -181,7 +181,7 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec objectReader, ok := cacheable.(ObjectReader) if !ok { - return nil, fmt.Errorf("expected object reader, got %T", objectReader) + return nil, fmt.Errorf("expected object reader, got %T", cacheable) } return objectReader, nil @@ -198,7 +198,7 @@ func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.Repository objectInfoReader, ok := cacheable.(ObjectInfoReader) if !ok { - return nil, fmt.Errorf("expected object info reader, got %T", objectInfoReader) + return nil, fmt.Errorf("expected object info reader, got %T", cacheable) } return objectInfoReader, nil diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index 3a5592ddd..d789e8d74 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -218,7 +218,7 @@ func TestCache_ObjectReader(t *testing.T) { // We're cheating a bit here to avoid creating a racy test by reaching into the // process and trying to read from its stdout. If the cancel did kill the process as // expected, then the stdout should be closed and we'll get an EOF. - output, err := io.ReadAll(objectReaderImpl.stdout) + output, err := io.ReadAll(objectReaderImpl.queue.stdout) if err != nil { require.True(t, errors.Is(err, os.ErrClosed)) } else { @@ -354,7 +354,7 @@ func TestCache_ObjectInfoReader(t *testing.T) { // We're cheating a bit here to avoid creating a racy test by reaching into the // process and trying to read from its stdout. If the cancel did kill the process as // expected, then the stdout should be closed and we'll get an EOF. - output, err := io.ReadAll(objectInfoReaderImpl.stdout) + output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout) if err != nil { require.True(t, errors.Is(err, os.ErrClosed)) } else { diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index f73b9b57c..d26c77b1d 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -6,7 +6,7 @@ import ( "fmt" "strconv" "strings" - "sync" + "sync/atomic" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -90,19 +90,35 @@ type ObjectInfoReader interface { // Info requests information about the revision pointed to by the given revision. Info(context.Context, git.Revision) (*ObjectInfo, error) + + // InfoQueue returns an ObjectInfoQueue that can be used to batch multiple object info + // requests. Using the queue is more efficient than using `Info()` when requesting a bunch + // of objects. The returned function must be executed after use of the ObjectInfoQueue has + // finished. + InfoQueue(context.Context) (ObjectInfoQueue, func(), error) +} + +// ObjectInfoQueue allows for requesting and reading object info independently of each other. The +// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the +// object has been requested already. The order of objects returned by ReadInfo is the same as the +// order in which object info has been requested. +type ObjectInfoQueue interface { + // RequestRevision requests the given revision from git-cat-file(1). + RequestRevision(git.Revision) error + // ReadInfo reads object info which has previously been requested. + ReadInfo() (*ObjectInfo, error) } // objectInfoReader is a reader for Git object information. This reader is implemented via a // long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate // process per object info we're about to read. type objectInfoReader struct { - cmd *command.Command - stdout *bufio.Reader - sync.Mutex - - closed bool + cmd *command.Command counter *prometheus.CounterVec + + queue requestQueue + queueInUse int32 } func newObjectInfoReader( @@ -127,8 +143,11 @@ func newObjectInfoReader( objectInfoReader := &objectInfoReader{ cmd: batchCmd, - stdout: bufio.NewReader(batchCmd), counter: counter, + queue: requestQueue{ + stdout: bufio.NewReader(batchCmd), + stdin: batchCmd, + }, } go func() { <-ctx.Done() @@ -141,36 +160,56 @@ func newObjectInfoReader( } func (o *objectInfoReader) close() { - o.Lock() - defer o.Unlock() - + o.queue.close() _ = o.cmd.Wait() - - o.closed = true } func (o *objectInfoReader) isClosed() bool { - o.Lock() - defer o.Unlock() - return o.closed + return o.queue.isClosed() } func (o *objectInfoReader) isDirty() bool { - // We always consume object info directly, so the reader cannot ever be dirty. - return false + return o.queue.isDirty() +} + +func (o *objectInfoReader) infoQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) { + if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) { + return nil, nil, fmt.Errorf("object info queue already in use") + } + + trace := startTrace(ctx, o.counter, tracedMethod) + o.queue.trace = trace + + return &o.queue, func() { + atomic.StoreInt32(&o.queueInUse, 0) + trace.finish() + }, nil } func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { - trace, finish := startTrace(ctx, o.counter, "catfile.Info") - defer finish() + queue, cleanup, err := o.infoQueue(ctx, "catfile.Info") + if err != nil { + return nil, err + } + defer cleanup() - o.Lock() - defer o.Unlock() + if err := queue.RequestRevision(revision); err != nil { + return nil, err + } - if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { + objectInfo, err := queue.ReadInfo() + if err != nil { return nil, err } - trace.recordRequest("info") - return ParseObjectInfo(o.stdout) + return objectInfo, nil +} + +func (o *objectInfoReader) InfoQueue(ctx context.Context) (ObjectInfoQueue, func(), error) { + queue, cleanup, err := o.infoQueue(ctx, "catfile.InfoQueue") + if err != nil { + return nil, nil, err + } + + return queue, cleanup, nil } diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index 2ac409542..84c149aa7 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -2,7 +2,9 @@ package catfile import ( "bufio" + "errors" "fmt" + "os" "strings" "testing" @@ -162,3 +164,201 @@ func TestObjectInfoReader(t *testing.T) { }) } } + +func TestObjectInfoReader_queue(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) + + blobOID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) + blobInfo := ObjectInfo{ + Oid: blobOID, + Type: "blob", + Size: int64(len("foobar")), + } + + commitOID := gittest.WriteCommit(t, cfg, repoPath) + commitInfo := ObjectInfo{ + Oid: commitOID, + Type: "commit", + Size: 225, + } + + t.Run("read single info", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + }) + + t.Run("read multiple object infos", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for oid, objectInfo := range map[git.ObjectID]ObjectInfo{ + blobOID: blobInfo, + commitOID: commitInfo, + } { + require.NoError(t, queue.RequestRevision(oid.Revision())) + + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &objectInfo, info) + } + }) + + t.Run("request multiple object infos", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(commitOID.Revision())) + + for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} { + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &expectedInfo, info) + } + }) + + t.Run("read without request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + _, err = queue.ReadInfo() + require.Equal(t, errors.New("no outstanding request"), err) + }) + + t.Run("request invalid object info", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision("does-not-exist")) + + _, err = queue.ReadInfo() + require.Equal(t, NotFoundError{errors.New("object not found")}, err) + }) + + t.Run("can continue reading after NotFoundError", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision("does-not-exist")) + _, err = queue.ReadInfo() + require.Equal(t, NotFoundError{errors.New("object not found")}, err) + + // Requesting another object info after the previous one has failed should continue + // to work alright. + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + info, err := queue.ReadInfo() + require.NoError(t, err) + require.Equal(t, &blobInfo, info) + }) + + t.Run("requesting multiple queues fails", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + _, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + _, _, err = reader.infoQueue(ctx, "trace") + require.Equal(t, errors.New("object info queue already in use"), err) + + // After calling cleanup we should be able to create an object queue again. + cleanup() + + _, cleanup, err = reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + }) + + t.Run("requesting object dirties reader", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.False(t, reader.isDirty()) + require.False(t, queue.isDirty()) + + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + + require.True(t, reader.isDirty()) + require.True(t, queue.isDirty()) + + _, err = queue.ReadInfo() + require.NoError(t, err) + + require.False(t, reader.isDirty()) + require.False(t, queue.isDirty()) + }) + + t.Run("closing queue blocks request", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + queue.close() + + require.True(t, reader.isClosed()) + require.True(t, queue.isClosed()) + + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(blobOID.Revision())) + }) + + t.Run("closing queue blocks read", func(t *testing.T) { + reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.infoQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // Request the object before we close the queue. + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + + queue.close() + + require.True(t, reader.isClosed()) + require.True(t, queue.isClosed()) + + _, err = queue.ReadInfo() + require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err) + }) +} diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 25f47654b..540e671de 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -6,7 +6,7 @@ import ( "fmt" "io" "os" - "sync" + "sync/atomic" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -20,32 +20,36 @@ type ObjectReader interface { // Reader returns a new Object for the given revision. The Object must be fully consumed // before another object is requested. - Object(_ context.Context, _ git.Revision) (*Object, error) + Object(context.Context, git.Revision) (*Object, error) + + // ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests. + // Using the queue is more efficient than using `Object()` when requesting a bunch of + // objects. The returned function must be executed after use of the ObjectQueue has + // finished. + ObjectQueue(context.Context) (ObjectQueue, func(), error) +} + +// ObjectQueue allows for requesting and reading objects independently of each other. The number of +// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has +// been requested already. The order of objects returned by ReadObject is the same as the order in +// which objects have been requested. +type ObjectQueue interface { + // RequestRevision requests the given revision from git-cat-file(1). + RequestRevision(git.Revision) error + // ReadObject reads an object which has previously been requested. + ReadObject() (*Object, error) } // objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file // --batch` process such that we do not have to spawn a new process for each object we are about to // read. type objectReader struct { - cmd *command.Command - stdout *bufio.Reader - - // n is a state machine that tracks how much data we still have to read - // from r. Legal states are: n==0, this means we can do a new request on - // the cat-file process. n==1, this means that we have to discard a - // trailing newline. n>0, this means we are in the middle of reading a - // raw git object. - n int64 - - // Even though the batch type should not be used concurrently, I think - // that if that does happen by mistake we should give proper errors - // instead of doing unsafe memory writes (to n) and failing in some - // unpredictable way. - sync.Mutex - - closed bool + cmd *command.Command counter *prometheus.CounterVec + + queue requestQueue + queueInUse int32 } func newObjectReader( @@ -70,8 +74,12 @@ func newObjectReader( objectReader := &objectReader{ cmd: batchCmd, - stdout: bufio.NewReader(batchCmd), counter: counter, + queue: requestQueue{ + isObjectQueue: true, + stdout: bufio.NewReader(batchCmd), + stdin: batchCmd, + }, } go func() { <-ctx.Done() @@ -83,98 +91,103 @@ func newObjectReader( } func (o *objectReader) close() { - o.Lock() - defer o.Unlock() - + o.queue.close() _ = o.cmd.Wait() - - o.closed = true } func (o *objectReader) isClosed() bool { - o.Lock() - defer o.Unlock() - return o.closed -} - -func (o *objectReader) consume(nBytes int64) { - o.n -= nBytes - if o.n < 1 { - panic("too many bytes read from batch") - } + return o.queue.isClosed() } func (o *objectReader) isDirty() bool { - o.Lock() - defer o.Unlock() - - return o.n > 1 + return o.queue.isDirty() } -// Object represents data returned by `git cat-file --batch` -type Object struct { - // ObjectInfo represents main information about object - ObjectInfo - // parent is the objectReader which has created the Object. - parent *objectReader - // dataReader is reader which has all the object data. - dataReader io.LimitedReader -} +func (o *objectReader) objectQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) { + if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) { + return nil, nil, fmt.Errorf("object queue already in use") + } -func (o *objectReader) Object( - ctx context.Context, - revision git.Revision, -) (*Object, error) { - trace, finish := startTrace(ctx, o.counter, "catfile.Object") - defer finish() + trace := startTrace(ctx, o.counter, tracedMethod) + o.queue.trace = trace - o.Lock() - defer o.Unlock() + return &o.queue, func() { + atomic.StoreInt32(&o.queueInUse, 0) + trace.finish() + }, nil +} - if o.n == 1 { - // Consume linefeed - if _, err := o.stdout.ReadByte(); err != nil { - return nil, err - } - o.n-- +func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Object, error) { + queue, finish, err := o.objectQueue(ctx, "catfile.Object") + if err != nil { + return nil, err } + defer finish() - if o.n != 0 { - return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n) + if err := queue.RequestRevision(revision); err != nil { + return nil, err } - if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil { + object, err := queue.ReadObject() + if err != nil { return nil, err } - oi, err := ParseObjectInfo(o.stdout) + return object, nil +} + +func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) { + queue, finish, err := o.objectQueue(ctx, "catfile.ObjectQueue") if err != nil { - return nil, err + return nil, nil, err } - trace.recordRequest(oi.Type) + return queue, finish, nil +} - o.n = oi.Size + 1 +// Object represents data returned by `git cat-file --batch` +type Object struct { + // ObjectInfo represents main information about object + ObjectInfo - return &Object{ - ObjectInfo: *oi, - parent: o, - dataReader: io.LimitedReader{ - R: o.stdout, - N: oi.Size, - }, - }, nil + // dataReader is reader which has all the object data. + dataReader io.LimitedReader + + // bytesLeft tracks the number of bytes which are left to be read. While this duplicates the + // information tracked in dataReader.N, this cannot be helped given that we need to make + // access to this information atomic so there's no race between updating it and checking the + // process for dirtiness. While we could use locking instead of atomics, we'd have to lock + // during the whole read duration -- and thus it'd become impossible to check for dirtiness + // at the same time. + bytesRemaining int64 + + // closed determines whether the object is closed for reading. + closed int32 } -func (o *Object) Read(p []byte) (int, error) { - o.parent.Lock() - defer o.parent.Unlock() +// isDirty determines whether the object is still dirty, that is whether there are still unconsumed +// bytes. +func (o *Object) isDirty() bool { + return atomic.LoadInt64(&o.bytesRemaining) != 0 +} + +func (o *Object) isClosed() bool { + return atomic.LoadInt32(&o.closed) == 1 +} + +func (o *Object) close() { + atomic.StoreInt32(&o.closed, 1) +} - if o.parent.closed { +func (o *Object) Read(p []byte) (int, error) { + if o.isClosed() { return 0, os.ErrClosed } n, err := o.dataReader.Read(p) - o.parent.consume(int64(n)) + if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 { + return n, fmt.Errorf("bytes remaining became negative while reading object") + } + return n, err } @@ -182,16 +195,16 @@ func (o *Object) Read(p []byte) (int, error) { // via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are // implemented by the respective reader or writer. func (o *Object) WriteTo(w io.Writer) (int64, error) { - o.parent.Lock() - defer o.parent.Unlock() - - if o.parent.closed { + if o.isClosed() { return 0, os.ErrClosed } // While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of // `ReadFrom()` in case the writer implements it. n, err := io.Copy(w, &o.dataReader) - o.parent.consume(n) + if atomic.AddInt64(&o.bytesRemaining, -n) < 0 { + return n, fmt.Errorf("bytes remaining became negative while reading object") + } + return n, err } diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index aad733f78..165d782de 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -1,8 +1,10 @@ package catfile import ( + "errors" "fmt" "io" + "os" "testing" "github.com/prometheus/client_golang/prometheus" @@ -76,7 +78,7 @@ func TestObjectReader_reader(t *testing.T) { // We haven't yet consumed the previous object, so this must now fail. _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1)) + require.EqualError(t, err, "current object has not been fully read") }) t.Run("read fails when partially consuming previous object", func(t *testing.T) { @@ -91,7 +93,7 @@ func TestObjectReader_reader(t *testing.T) { // We haven't yet consumed the previous object, so this must now fail. _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1)) + require.EqualError(t, err, "current object has not been fully read") }) t.Run("read increments Prometheus counter", func(t *testing.T) { @@ -118,3 +120,233 @@ func TestObjectReader_reader(t *testing.T) { } }) } + +func TestObjectReader_queue(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) + + foobarBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) + barfooBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo")) + + t.Run("read single object", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + }) + + t.Run("read multiple objects", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + for blobID, blobContents := range map[git.ObjectID]string{ + foobarBlob: "foobar", + barfooBlob: "barfoo", + } { + require.NoError(t, queue.RequestRevision(blobID.Revision())) + + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, blobContents, string(contents)) + } + }) + + t.Run("request multiple objects", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(barfooBlob.Revision())) + + for _, expectedContents := range []string{"foobar", "barfoo"} { + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, expectedContents, string(contents)) + } + }) + + t.Run("read without request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + _, err = queue.ReadObject() + require.Equal(t, errors.New("no outstanding request"), err) + }) + + t.Run("request invalid object", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision("does-not-exist")) + + _, err = queue.ReadObject() + require.Equal(t, NotFoundError{errors.New("object not found")}, err) + }) + + t.Run("can continue reading after NotFoundError", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision("does-not-exist")) + _, err = queue.ReadObject() + require.Equal(t, NotFoundError{errors.New("object not found")}, err) + + // Requesting another object after the previous one has failed should continue to + // work alright. + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + object, err := queue.ReadObject() + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, "foobar", string(contents)) + }) + + t.Run("requesting multiple queues fails", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + _, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + _, _, err = reader.objectQueue(ctx, "trace") + require.Equal(t, errors.New("object queue already in use"), err) + + // After calling cleanup we should be able to create an object queue again. + cleanup() + + _, cleanup, err = reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + }) + + t.Run("requesting object dirties reader", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.False(t, reader.isDirty()) + require.False(t, queue.isDirty()) + + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + + require.True(t, reader.isDirty()) + require.True(t, queue.isDirty()) + + object, err := queue.ReadObject() + require.NoError(t, err) + + // The object has not been consumed yet, so the reader must still be dirty. + require.True(t, reader.isDirty()) + require.True(t, queue.isDirty()) + + _, err = io.ReadAll(object) + require.NoError(t, err) + + require.False(t, reader.isDirty()) + require.False(t, queue.isDirty()) + }) + + t.Run("closing queue blocks request", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + queue.close() + + require.True(t, reader.isClosed()) + require.True(t, queue.isClosed()) + + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(foobarBlob.Revision())) + }) + + t.Run("closing queue blocks read", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + // Request the object before we close the queue. + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + + queue.close() + + require.True(t, reader.isClosed()) + require.True(t, queue.isClosed()) + + _, err = queue.ReadObject() + require.Equal(t, fmt.Errorf("cannot read object: %w", os.ErrClosed), err) + }) + + t.Run("closing queue blocks consuming", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + queue, cleanup, err := reader.objectQueue(ctx, "trace") + require.NoError(t, err) + defer cleanup() + + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + + // Read the object header before closing. + object, err := queue.ReadObject() + require.NoError(t, err) + + queue.close() + + require.True(t, reader.isClosed()) + require.True(t, queue.isClosed()) + require.True(t, object.isClosed()) + + _, err = io.ReadAll(object) + require.Equal(t, os.ErrClosed, err) + }) +} diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go new file mode 100644 index 000000000..279b9d0c2 --- /dev/null +++ b/internal/git/catfile/request_queue.go @@ -0,0 +1,166 @@ +package catfile + +import ( + "bufio" + "fmt" + "io" + "os" + "sync" + "sync/atomic" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git" +) + +type requestQueue struct { + // isObjectQueue is set to `true` when this is a request queue which can be used for reading + // objects. If set to `false`, then this can only be used to read object info. + isObjectQueue bool + + stdout *bufio.Reader + stdin io.Writer + + // outstandingRequests is the number of requests which have been queued up. Gets incremented + // on request, and decremented when starting to read an object (not when that object has + // been fully consumed). + outstandingRequests int64 + + // closed indicates whether the queue is closed for additional requests. + closed int32 + + // currentObject is the currently read object. + currentObject *Object + currentObjectLock sync.Mutex + + // trace is the current tracing span. + trace *trace +} + +// isDirty returns true either if there are outstanding requests for objects or if the current +// object hasn't yet been fully consumed. +func (q *requestQueue) isDirty() bool { + q.currentObjectLock.Lock() + defer q.currentObjectLock.Unlock() + + // We must check for the current object first: we cannot queue another object due to the + // object lock, but we may queue another request while checking for dirtiness. + if q.currentObject != nil { + return q.currentObject.isDirty() + } + + if atomic.LoadInt64(&q.outstandingRequests) != 0 { + return true + } + + return false +} + +func (q *requestQueue) isClosed() bool { + return atomic.LoadInt32(&q.closed) == 1 +} + +func (q *requestQueue) close() { + if atomic.CompareAndSwapInt32(&q.closed, 0, 1) { + q.currentObjectLock.Lock() + defer q.currentObjectLock.Unlock() + + if q.currentObject != nil { + q.currentObject.close() + } + } +} + +func (q *requestQueue) RequestRevision(revision git.Revision) error { + if q.isClosed() { + return fmt.Errorf("cannot request revision: %w", os.ErrClosed) + } + + atomic.AddInt64(&q.outstandingRequests, 1) + + if _, err := fmt.Fprintln(q.stdin, revision.String()); err != nil { + atomic.AddInt64(&q.outstandingRequests, -1) + return fmt.Errorf("requesting revision: %w", err) + } + + return nil +} + +func (q *requestQueue) ReadObject() (*Object, error) { + if !q.isObjectQueue { + panic("object queue used to read object info") + } + + q.currentObjectLock.Lock() + defer q.currentObjectLock.Unlock() + + if q.isClosed() { + return nil, fmt.Errorf("cannot read object: %w", os.ErrClosed) + } + + if atomic.LoadInt64(&q.outstandingRequests) == 0 { + return nil, fmt.Errorf("no outstanding request") + } + + if q.currentObject != nil { + // If the current object is still dirty, then we must not try to read a new object. + if q.currentObject.isDirty() { + return nil, fmt.Errorf("current object has not been fully read") + } + + q.currentObject.close() + q.currentObject = nil + + // If we have already read an object before, then we must consume the trailing + // newline after the object's data. + if _, err := q.stdout.ReadByte(); err != nil { + return nil, err + } + } + + oi, err := ParseObjectInfo(q.stdout) + if err != nil { + return nil, err + } + q.trace.recordRequest(oi.Type) + + if atomic.AddInt64(&q.outstandingRequests, -1) < 0 { + return nil, fmt.Errorf("negative number of requests") + } + + q.currentObject = &Object{ + ObjectInfo: *oi, + dataReader: io.LimitedReader{ + R: q.stdout, + N: oi.Size, + }, + bytesRemaining: oi.Size, + } + + return q.currentObject, nil +} + +func (q *requestQueue) ReadInfo() (*ObjectInfo, error) { + if q.isObjectQueue { + panic("object queue used to read object info") + } + + if q.isClosed() { + return nil, fmt.Errorf("cannot read object info: %w", os.ErrClosed) + } + + // We first need to determine wether there are any queued requests at all. If not, then we + // cannot read anything. + queuedRequests := atomic.LoadInt64(&q.outstandingRequests) + if queuedRequests == 0 { + return nil, fmt.Errorf("no outstanding request") + } + + // And when there are, we need to remove one of these queued requests. We do so via + // `CompareAndSwapInt64()`, which easily allows us to detect concurrent access to the queue. + if !atomic.CompareAndSwapInt64(&q.outstandingRequests, queuedRequests, queuedRequests-1) { + return nil, fmt.Errorf("concurrent access to object info queue") + } + + q.trace.recordRequest("info") + + return ParseObjectInfo(q.stdout) +} diff --git a/internal/git/catfile/tracing.go b/internal/git/catfile/tracing.go index ab2a76d3d..a1a6fd90b 100644 --- a/internal/git/catfile/tracing.go +++ b/internal/git/catfile/tracing.go @@ -2,6 +2,7 @@ package catfile import ( "context" + "sync" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -11,18 +12,18 @@ type trace struct { span opentracing.Span counter *prometheus.CounterVec - requests map[string]int + requestsLock sync.Mutex + requests map[string]int } // startTrace starts a new tracing span and updates metrics according to how many requests have been -// done during that trace. This must be called with two contexts: first the per-RPC context, which -// is the context of the current RPC call. And then the cache context, which is the decorrelated -// context for cached catfile processes. Spans are then created for both contexts. +// done during that trace. The caller must call `finish()` on the resulting after it's deemed to be +// done such that metrics get recorded correctly. func startTrace( ctx context.Context, counter *prometheus.CounterVec, methodName string, -) (*trace, func()) { +) *trace { span, _ := opentracing.StartSpanFromContext(ctx, methodName) trace := &trace{ @@ -37,14 +38,19 @@ func startTrace( }, } - return trace, trace.finish + return trace } func (t *trace) recordRequest(requestType string) { + t.requestsLock.Lock() + defer t.requestsLock.Unlock() t.requests[requestType]++ } func (t *trace) finish() { + t.requestsLock.Lock() + defer t.requestsLock.Unlock() + for requestType, requestCount := range t.requests { if requestCount == 0 { continue diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 1c00443d0..e87628f38 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -40,6 +40,12 @@ func WithSkipCatfileInfoResult(skipResult func(*catfile.ObjectInfo) bool) Catfil } } +type catfileInfoRequest struct { + objectID git.ObjectID + objectName []byte + err error +} + // CatfileInfo processes revlistResults from the given channel and extracts object information via // `git cat-file --batch-check`. The returned channel will contain all processed catfile info // results. Any error received via the channel or encountered in this step will cause the pipeline @@ -49,21 +55,67 @@ func CatfileInfo( objectInfoReader catfile.ObjectInfoReader, it ObjectIterator, opts ...CatfileInfoOption, -) CatfileInfoIterator { +) (CatfileInfoIterator, error) { var cfg catfileInfoConfig for _, opt := range opts { opt(&cfg) } + queue, cleanup, err := objectInfoReader.InfoQueue(ctx) + if err != nil { + return nil, err + } + defer cleanup() + + requestChan := make(chan catfileInfoRequest) + go func() { + defer close(requestChan) + + for it.Next() { + if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } + + select { + case requestChan <- catfileInfoRequest{ + objectID: it.ObjectID(), + objectName: it.ObjectName(), + }: + case <-ctx.Done(): + return + } + } + + if err := it.Err(); err != nil { + select { + case requestChan <- catfileInfoRequest{err: err}: + case <-ctx.Done(): + return + } + } + }() + resultChan := make(chan CatfileInfoResult) go func() { defer close(resultChan) - for it.Next() { - objectInfo, err := objectInfoReader.Info(ctx, it.ObjectID().Revision()) + // It's fine to iterate over the request channel without paying attention to + // context cancellation because the request channel itself would be closed if the + // context was cancelled. + for request := range requestChan { + if request.err != nil { + sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: request.err}) + break + } + + objectInfo, err := queue.ReadInfo() if err != nil { sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ - err: fmt.Errorf("retrieving object info for %q: %w", it.ObjectID(), err), + err: fmt.Errorf("retrieving object info for %q: %w", request.objectID, err), }) return } @@ -73,22 +125,17 @@ func CatfileInfo( } if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{ - ObjectName: it.ObjectName(), + ObjectName: request.objectName, ObjectInfo: objectInfo, }); isDone { return } } - - if err := it.Err(); err != nil { - sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err}) - return - } }() return &catfileInfoIterator{ ch: resultChan, - } + }, nil } // CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 16e2980bf..6cddfd808 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -136,7 +136,8 @@ func TestCatfileInfo(t *testing.T) { objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo) require.NoError(t, err) - it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) + it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...) + require.NoError(t, err) var results []CatfileInfoResult for it.Next() { diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index d5aa1e7f6..5ec2e9fb5 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -23,6 +23,11 @@ type CatfileObjectResult struct { git.Object } +type catfileObjectRequest struct { + objectName []byte + err error +} + // CatfileObject processes catfileInfoResults from the given channel and reads associated objects // into memory via `git cat-file --batch`. The returned channel will contain all processed objects. // Any error received via the channel or encountered in this step will cause the pipeline to fail. @@ -32,7 +37,42 @@ func CatfileObject( ctx context.Context, objectReader catfile.ObjectReader, it ObjectIterator, -) CatfileObjectIterator { +) (CatfileObjectIterator, error) { + queue, cleanup, err := objectReader.ObjectQueue(ctx) + if err != nil { + return nil, err + } + defer cleanup() + + requestChan := make(chan catfileObjectRequest) + go func() { + defer close(requestChan) + + for it.Next() { + if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + + select { + case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}: + case <-ctx.Done(): + return + } + } + + if err := it.Err(); err != nil { + select { + case requestChan <- catfileObjectRequest{err: err}: + case <-ctx.Done(): + return + } + } + }() + resultChan := make(chan CatfileObjectResult) go func() { defer close(resultChan) @@ -60,7 +100,15 @@ func CatfileObject( var previousObject *synchronizingObject - for it.Next() { + // It's fine to iterate over the request channel without paying attention to + // context cancellation because the request channel itself would be closed if the + // context was cancelled. + for request := range requestChan { + if request.err != nil { + sendResult(CatfileObjectResult{err: request.err}) + break + } + // We mustn't try to read another object before reading the previous object // has concluded. Given that this is not under our control but under the // control of the caller, we thus have to wait until the blocking reader has @@ -73,7 +121,7 @@ func CatfileObject( } } - object, err := objectReader.Object(ctx, it.ObjectID().Revision()) + object, err := queue.ReadObject() if err != nil { sendResult(CatfileObjectResult{ err: fmt.Errorf("requesting object: %w", err), @@ -87,22 +135,17 @@ func CatfileObject( } if isDone := sendResult(CatfileObjectResult{ - ObjectName: it.ObjectName(), + ObjectName: request.objectName, Object: previousObject, }); isDone { return } } - - if err := it.Err(); err != nil { - sendResult(CatfileObjectResult{err: err}) - return - } }() return &catfileObjectIterator{ ch: resultChan, - } + }, nil } type synchronizingObject struct { diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go index fd02726d9..6cb20aa0b 100644 --- a/internal/git/gitpipe/catfile_object_test.go +++ b/internal/git/gitpipe/catfile_object_test.go @@ -78,7 +78,8 @@ func TestCatfileObject(t *testing.T) { objectReader, err := catfileCache.ObjectReader(ctx, repo) require.NoError(t, err) - it := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) + it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs)) + require.NoError(t, err) var results []CatfileObjectResult for it.Next() { diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go index 25afd4259..a80f7e769 100644 --- a/internal/git/gitpipe/pipeline_test.go +++ b/internal/git/gitpipe/pipeline_test.go @@ -222,8 +222,12 @@ func TestPipeline_revlist(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...) - catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...) + require.NoError(t, err) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) var results []CatfileObjectResult for catfileObjectIter.Next() { @@ -274,8 +278,12 @@ func TestPipeline_revlist(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, []string{"--all"}) - catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter) + require.NoError(t, err) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) i := 0 for catfileObjectIter.Next() { @@ -311,8 +319,12 @@ func TestPipeline_revlist(t *testing.T) { require.NoError(t, err) revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects()) - catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter) + require.NoError(t, err) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) i := 0 var wg sync.WaitGroup @@ -361,8 +373,12 @@ func TestPipeline_forEachRef(t *testing.T) { require.NoError(t, err) forEachRefIter := ForEachRef(ctx, repo, nil) - catfileInfoIter := CatfileInfo(ctx, objectInfoReader, forEachRefIter) - catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, forEachRefIter) + require.NoError(t, err) + + catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter) + require.NoError(t, err) type object struct { oid git.ObjectID diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index 55c406dd8..97e30ba7d 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -103,7 +103,10 @@ func (s *server) processBlobs( return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err)) } - catfileInfoIter = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter) + catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter) + if err != nil { + return helper.ErrInternalf("creating object info iterator: %w", err) + } } var i uint32 @@ -134,7 +137,10 @@ func (s *server) processBlobs( return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err)) } - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter) + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter) + if err != nil { + return helper.ErrInternalf("creating catfile object iterator: %w", err) + } var i uint32 for catfileObjectIter.Next() { diff --git a/internal/gitaly/service/blob/blobs_test.go b/internal/gitaly/service/blob/blobs_test.go index ae668dce4..9a2eaa717 100644 --- a/internal/gitaly/service/blob/blobs_test.go +++ b/internal/gitaly/service/blob/blobs_test.go @@ -428,23 +428,88 @@ func BenchmarkListAllBlobs(b *testing.B) { _, repoProto, _, client := setup(b) - b.Run("ListAllBlobs", func(b *testing.B) { - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - stream, err := client.ListAllBlobs(ctx, &gitalypb.ListAllBlobsRequest{ + for _, tc := range []struct { + desc string + request *gitalypb.ListAllBlobsRequest + }{ + { + desc: "with contents", + request: &gitalypb.ListAllBlobsRequest{ Repository: repoProto, BytesLimit: -1, - }) - require.NoError(b, err) + }, + }, + { + desc: "without contents", + request: &gitalypb.ListAllBlobsRequest{ + Repository: repoProto, + BytesLimit: 0, + }, + }, + } { + b.Run(tc.desc, func(b *testing.B) { + b.ReportAllocs() - for { - _, err := stream.Recv() - if err == io.EOF { - break + for i := 0; i < b.N; i++ { + stream, err := client.ListAllBlobs(ctx, tc.request) + require.NoError(b, err) + + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) } + } + }) + } +} + +func BenchmarkListBlobs(b *testing.B) { + b.StopTimer() + + ctx, cancel := testhelper.Context() + defer cancel() + + _, repoProto, _, client := setup(b) + + for _, tc := range []struct { + desc string + request *gitalypb.ListBlobsRequest + }{ + { + desc: "with contents", + request: &gitalypb.ListBlobsRequest{ + Repository: repoProto, + Revisions: []string{"refs/heads/master"}, + BytesLimit: -1, + }, + }, + { + desc: "without contents", + request: &gitalypb.ListBlobsRequest{ + Repository: repoProto, + Revisions: []string{"refs/heads/master"}, + BytesLimit: 0, + }, + }, + } { + b.Run(tc.desc, func(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + stream, err := client.ListBlobs(ctx, tc.request) require.NoError(b, err) + + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + } } - } - }) + }) + } } diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index a937881ff..ca05b4750 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -62,7 +62,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git gitpipe.WithBlobLimit(lfsPointerMaxSize), gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err @@ -100,7 +104,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil { return err @@ -144,12 +152,19 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)} } - catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs), + catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs), gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool { return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return helper.ErrInternalf("creating object info iterator: %w", err) + } + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return helper.ErrInternalf("creating object iterator: %w", err) + } if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil { return err diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go index 0ab8a9b66..d5fa18622 100644 --- a/internal/gitaly/service/commit/list_all_commits.go +++ b/internal/gitaly/service/commit/list_all_commits.go @@ -53,7 +53,10 @@ func (s *server) ListAllCommits( }), ) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter) + if err != nil { + return err + } chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go index eb6bb4b0c..826edb9ee 100644 --- a/internal/gitaly/service/commit/list_commits.go +++ b/internal/gitaly/service/commit/list_commits.go @@ -100,7 +100,11 @@ func (s *server) ListCommits( } revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return err + } chunker := chunk.New(&commitsSender{ send: func(commits []*gitalypb.GitCommit) error { diff --git a/internal/gitaly/service/commit/tree_entries_helper.go b/internal/gitaly/service/commit/tree_entries_helper.go index 09853a890..e26265f4d 100644 --- a/internal/gitaly/service/commit/tree_entries_helper.go +++ b/internal/gitaly/service/commit/tree_entries_helper.go @@ -143,16 +143,15 @@ func treeEntries( } return nil, err } - defer func() { - if _, err := io.Copy(io.Discard, treeObj); err != nil && returnedErr == nil { - returnedErr = fmt.Errorf("discarding object: %w", err) - } - }() // The tree entry may not refer to a subtree, but instead to a blob. Historically, we have // simply ignored such objects altogether and didn't return an error, so we keep the same // behaviour. if treeObj.Type != "tree" { + if _, err := io.Copy(io.Discard, treeObj); err != nil && returnedErr == nil { + return nil, fmt.Errorf("discarding object: %w", err) + } + return nil, nil } diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go index 47eb787a2..4bfb2d0e9 100644 --- a/internal/gitaly/service/ref/find_all_tags.go +++ b/internal/gitaly/service/ref/find_all_tags.go @@ -52,7 +52,11 @@ func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortFiel gitpipe.WithSortField(sortField), gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"), ) - catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter) + + catfileObjectsIter, err := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter) + if err != nil { + return err + } chunker := chunk.New(&tagSender{stream: stream}) diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go index 8ffaf4de9..8bdf78479 100644 --- a/internal/gitaly/service/ref/tag_signatures.go +++ b/internal/gitaly/service/ref/tag_signatures.go @@ -58,7 +58,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream } revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...) - catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + + catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter) + if err != nil { + return err + } for catfileObjectIter.Next() { tag := catfileObjectIter.Result() |