Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-11-09 13:20:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-11-09 13:20:30 +0300
commit06ec7a17f320497d13efdc06f7798b919f45fa9d (patch)
tree474f279c6824a30c878b8994e6f4bcf9b038c3cb
parent12082c14c38c110bc9c9442017a9f83cd6e80674 (diff)
parent73b8445fe775fbbb0bc544e069d0555f1e5ddaf3 (diff)
Merge branch 'pks-catfile-queue' into 'master'
catfile: Introduce request queues to allow batching reads Closes #3783 See merge request gitlab-org/gitaly!4032
-rw-r--r--internal/git/catfile/cache.go4
-rw-r--r--internal/git/catfile/cache_test.go4
-rw-r--r--internal/git/catfile/object_info_reader.go87
-rw-r--r--internal/git/catfile/object_info_reader_test.go200
-rw-r--r--internal/git/catfile/object_reader.go187
-rw-r--r--internal/git/catfile/object_reader_test.go236
-rw-r--r--internal/git/catfile/request_queue.go166
-rw-r--r--internal/git/catfile/tracing.go18
-rw-r--r--internal/git/gitpipe/catfile_info.go69
-rw-r--r--internal/git/gitpipe/catfile_info_test.go3
-rw-r--r--internal/git/gitpipe/catfile_object.go63
-rw-r--r--internal/git/gitpipe/catfile_object_test.go3
-rw-r--r--internal/git/gitpipe/pipeline_test.go32
-rw-r--r--internal/gitaly/service/blob/blobs.go10
-rw-r--r--internal/gitaly/service/blob/blobs_test.go91
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go23
-rw-r--r--internal/gitaly/service/commit/list_all_commits.go5
-rw-r--r--internal/gitaly/service/commit/list_commits.go6
-rw-r--r--internal/gitaly/service/commit/tree_entries_helper.go9
-rw-r--r--internal/gitaly/service/ref/find_all_tags.go6
-rw-r--r--internal/gitaly/service/ref/tag_signatures.go6
21 files changed, 1046 insertions, 182 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go
index 31c3be38c..97679538f 100644
--- a/internal/git/catfile/cache.go
+++ b/internal/git/catfile/cache.go
@@ -181,7 +181,7 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec
objectReader, ok := cacheable.(ObjectReader)
if !ok {
- return nil, fmt.Errorf("expected object reader, got %T", objectReader)
+ return nil, fmt.Errorf("expected object reader, got %T", cacheable)
}
return objectReader, nil
@@ -198,7 +198,7 @@ func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.Repository
objectInfoReader, ok := cacheable.(ObjectInfoReader)
if !ok {
- return nil, fmt.Errorf("expected object info reader, got %T", objectInfoReader)
+ return nil, fmt.Errorf("expected object info reader, got %T", cacheable)
}
return objectInfoReader, nil
diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go
index 3a5592ddd..d789e8d74 100644
--- a/internal/git/catfile/cache_test.go
+++ b/internal/git/catfile/cache_test.go
@@ -218,7 +218,7 @@ func TestCache_ObjectReader(t *testing.T) {
// We're cheating a bit here to avoid creating a racy test by reaching into the
// process and trying to read from its stdout. If the cancel did kill the process as
// expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectReaderImpl.stdout)
+ output, err := io.ReadAll(objectReaderImpl.queue.stdout)
if err != nil {
require.True(t, errors.Is(err, os.ErrClosed))
} else {
@@ -354,7 +354,7 @@ func TestCache_ObjectInfoReader(t *testing.T) {
// We're cheating a bit here to avoid creating a racy test by reaching into the
// process and trying to read from its stdout. If the cancel did kill the process as
// expected, then the stdout should be closed and we'll get an EOF.
- output, err := io.ReadAll(objectInfoReaderImpl.stdout)
+ output, err := io.ReadAll(objectInfoReaderImpl.queue.stdout)
if err != nil {
require.True(t, errors.Is(err, os.ErrClosed))
} else {
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index f73b9b57c..d26c77b1d 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -6,7 +6,7 @@ import (
"fmt"
"strconv"
"strings"
- "sync"
+ "sync/atomic"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -90,19 +90,35 @@ type ObjectInfoReader interface {
// Info requests information about the revision pointed to by the given revision.
Info(context.Context, git.Revision) (*ObjectInfo, error)
+
+ // InfoQueue returns an ObjectInfoQueue that can be used to batch multiple object info
+ // requests. Using the queue is more efficient than using `Info()` when requesting a bunch
+ // of objects. The returned function must be executed after use of the ObjectInfoQueue has
+ // finished.
+ InfoQueue(context.Context) (ObjectInfoQueue, func(), error)
+}
+
+// ObjectInfoQueue allows for requesting and reading object info independently of each other. The
+// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the
+// object has been requested already. The order of objects returned by ReadInfo is the same as the
+// order in which object info has been requested.
+type ObjectInfoQueue interface {
+ // RequestRevision requests the given revision from git-cat-file(1).
+ RequestRevision(git.Revision) error
+ // ReadInfo reads object info which has previously been requested.
+ ReadInfo() (*ObjectInfo, error)
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
// long-lived `git cat-file --batch-check` process such that we do not have to spawn a separate
// process per object info we're about to read.
type objectInfoReader struct {
- cmd *command.Command
- stdout *bufio.Reader
- sync.Mutex
-
- closed bool
+ cmd *command.Command
counter *prometheus.CounterVec
+
+ queue requestQueue
+ queueInUse int32
}
func newObjectInfoReader(
@@ -127,8 +143,11 @@ func newObjectInfoReader(
objectInfoReader := &objectInfoReader{
cmd: batchCmd,
- stdout: bufio.NewReader(batchCmd),
counter: counter,
+ queue: requestQueue{
+ stdout: bufio.NewReader(batchCmd),
+ stdin: batchCmd,
+ },
}
go func() {
<-ctx.Done()
@@ -141,36 +160,56 @@ func newObjectInfoReader(
}
func (o *objectInfoReader) close() {
- o.Lock()
- defer o.Unlock()
-
+ o.queue.close()
_ = o.cmd.Wait()
-
- o.closed = true
}
func (o *objectInfoReader) isClosed() bool {
- o.Lock()
- defer o.Unlock()
- return o.closed
+ return o.queue.isClosed()
}
func (o *objectInfoReader) isDirty() bool {
- // We always consume object info directly, so the reader cannot ever be dirty.
- return false
+ return o.queue.isDirty()
+}
+
+func (o *objectInfoReader) infoQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) {
+ if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
+ return nil, nil, fmt.Errorf("object info queue already in use")
+ }
+
+ trace := startTrace(ctx, o.counter, tracedMethod)
+ o.queue.trace = trace
+
+ return &o.queue, func() {
+ atomic.StoreInt32(&o.queueInUse, 0)
+ trace.finish()
+ }, nil
}
func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
- trace, finish := startTrace(ctx, o.counter, "catfile.Info")
- defer finish()
+ queue, cleanup, err := o.infoQueue(ctx, "catfile.Info")
+ if err != nil {
+ return nil, err
+ }
+ defer cleanup()
- o.Lock()
- defer o.Unlock()
+ if err := queue.RequestRevision(revision); err != nil {
+ return nil, err
+ }
- if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
+ objectInfo, err := queue.ReadInfo()
+ if err != nil {
return nil, err
}
- trace.recordRequest("info")
- return ParseObjectInfo(o.stdout)
+ return objectInfo, nil
+}
+
+func (o *objectInfoReader) InfoQueue(ctx context.Context) (ObjectInfoQueue, func(), error) {
+ queue, cleanup, err := o.infoQueue(ctx, "catfile.InfoQueue")
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return queue, cleanup, nil
}
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index 2ac409542..84c149aa7 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -2,7 +2,9 @@ package catfile
import (
"bufio"
+ "errors"
"fmt"
+ "os"
"strings"
"testing"
@@ -162,3 +164,201 @@ func TestObjectInfoReader(t *testing.T) {
})
}
}
+
+func TestObjectInfoReader_queue(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
+
+ blobOID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ blobInfo := ObjectInfo{
+ Oid: blobOID,
+ Type: "blob",
+ Size: int64(len("foobar")),
+ }
+
+ commitOID := gittest.WriteCommit(t, cfg, repoPath)
+ commitInfo := ObjectInfo{
+ Oid: commitOID,
+ Type: "commit",
+ Size: 225,
+ }
+
+ t.Run("read single info", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ })
+
+ t.Run("read multiple object infos", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for oid, objectInfo := range map[git.ObjectID]ObjectInfo{
+ blobOID: blobInfo,
+ commitOID: commitInfo,
+ } {
+ require.NoError(t, queue.RequestRevision(oid.Revision()))
+
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &objectInfo, info)
+ }
+ })
+
+ t.Run("request multiple object infos", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(commitOID.Revision()))
+
+ for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &expectedInfo, info)
+ }
+ })
+
+ t.Run("read without request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ _, err = queue.ReadInfo()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
+ t.Run("request invalid object info", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
+
+ _, err = queue.ReadInfo()
+ require.Equal(t, NotFoundError{errors.New("object not found")}, err)
+ })
+
+ t.Run("can continue reading after NotFoundError", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
+ _, err = queue.ReadInfo()
+ require.Equal(t, NotFoundError{errors.New("object not found")}, err)
+
+ // Requesting another object info after the previous one has failed should continue
+ // to work alright.
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ info, err := queue.ReadInfo()
+ require.NoError(t, err)
+ require.Equal(t, &blobInfo, info)
+ })
+
+ t.Run("requesting multiple queues fails", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ _, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ _, _, err = reader.infoQueue(ctx, "trace")
+ require.Equal(t, errors.New("object info queue already in use"), err)
+
+ // After calling cleanup we should be able to create an object queue again.
+ cleanup()
+
+ _, cleanup, err = reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+ })
+
+ t.Run("requesting object dirties reader", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.False(t, reader.isDirty())
+ require.False(t, queue.isDirty())
+
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+
+ require.True(t, reader.isDirty())
+ require.True(t, queue.isDirty())
+
+ _, err = queue.ReadInfo()
+ require.NoError(t, err)
+
+ require.False(t, reader.isDirty())
+ require.False(t, queue.isDirty())
+ })
+
+ t.Run("closing queue blocks request", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ queue.close()
+
+ require.True(t, reader.isClosed())
+ require.True(t, queue.isClosed())
+
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(blobOID.Revision()))
+ })
+
+ t.Run("closing queue blocks read", func(t *testing.T) {
+ reader, err := newObjectInfoReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.infoQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // Request the object before we close the queue.
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+
+ queue.close()
+
+ require.True(t, reader.isClosed())
+ require.True(t, queue.isClosed())
+
+ _, err = queue.ReadInfo()
+ require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err)
+ })
+}
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 25f47654b..540e671de 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -6,7 +6,7 @@ import (
"fmt"
"io"
"os"
- "sync"
+ "sync/atomic"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -20,32 +20,36 @@ type ObjectReader interface {
// Reader returns a new Object for the given revision. The Object must be fully consumed
// before another object is requested.
- Object(_ context.Context, _ git.Revision) (*Object, error)
+ Object(context.Context, git.Revision) (*Object, error)
+
+ // ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests.
+ // Using the queue is more efficient than using `Object()` when requesting a bunch of
+ // objects. The returned function must be executed after use of the ObjectQueue has
+ // finished.
+ ObjectQueue(context.Context) (ObjectQueue, func(), error)
+}
+
+// ObjectQueue allows for requesting and reading objects independently of each other. The number of
+// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has
+// been requested already. The order of objects returned by ReadObject is the same as the order in
+// which objects have been requested.
+type ObjectQueue interface {
+ // RequestRevision requests the given revision from git-cat-file(1).
+ RequestRevision(git.Revision) error
+ // ReadObject reads an object which has previously been requested.
+ ReadObject() (*Object, error)
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
// --batch` process such that we do not have to spawn a new process for each object we are about to
// read.
type objectReader struct {
- cmd *command.Command
- stdout *bufio.Reader
-
- // n is a state machine that tracks how much data we still have to read
- // from r. Legal states are: n==0, this means we can do a new request on
- // the cat-file process. n==1, this means that we have to discard a
- // trailing newline. n>0, this means we are in the middle of reading a
- // raw git object.
- n int64
-
- // Even though the batch type should not be used concurrently, I think
- // that if that does happen by mistake we should give proper errors
- // instead of doing unsafe memory writes (to n) and failing in some
- // unpredictable way.
- sync.Mutex
-
- closed bool
+ cmd *command.Command
counter *prometheus.CounterVec
+
+ queue requestQueue
+ queueInUse int32
}
func newObjectReader(
@@ -70,8 +74,12 @@ func newObjectReader(
objectReader := &objectReader{
cmd: batchCmd,
- stdout: bufio.NewReader(batchCmd),
counter: counter,
+ queue: requestQueue{
+ isObjectQueue: true,
+ stdout: bufio.NewReader(batchCmd),
+ stdin: batchCmd,
+ },
}
go func() {
<-ctx.Done()
@@ -83,98 +91,103 @@ func newObjectReader(
}
func (o *objectReader) close() {
- o.Lock()
- defer o.Unlock()
-
+ o.queue.close()
_ = o.cmd.Wait()
-
- o.closed = true
}
func (o *objectReader) isClosed() bool {
- o.Lock()
- defer o.Unlock()
- return o.closed
-}
-
-func (o *objectReader) consume(nBytes int64) {
- o.n -= nBytes
- if o.n < 1 {
- panic("too many bytes read from batch")
- }
+ return o.queue.isClosed()
}
func (o *objectReader) isDirty() bool {
- o.Lock()
- defer o.Unlock()
-
- return o.n > 1
+ return o.queue.isDirty()
}
-// Object represents data returned by `git cat-file --batch`
-type Object struct {
- // ObjectInfo represents main information about object
- ObjectInfo
- // parent is the objectReader which has created the Object.
- parent *objectReader
- // dataReader is reader which has all the object data.
- dataReader io.LimitedReader
-}
+func (o *objectReader) objectQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) {
+ if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
+ return nil, nil, fmt.Errorf("object queue already in use")
+ }
-func (o *objectReader) Object(
- ctx context.Context,
- revision git.Revision,
-) (*Object, error) {
- trace, finish := startTrace(ctx, o.counter, "catfile.Object")
- defer finish()
+ trace := startTrace(ctx, o.counter, tracedMethod)
+ o.queue.trace = trace
- o.Lock()
- defer o.Unlock()
+ return &o.queue, func() {
+ atomic.StoreInt32(&o.queueInUse, 0)
+ trace.finish()
+ }, nil
+}
- if o.n == 1 {
- // Consume linefeed
- if _, err := o.stdout.ReadByte(); err != nil {
- return nil, err
- }
- o.n--
+func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Object, error) {
+ queue, finish, err := o.objectQueue(ctx, "catfile.Object")
+ if err != nil {
+ return nil, err
}
+ defer finish()
- if o.n != 0 {
- return nil, fmt.Errorf("cannot create new Object: batch contains %d unread bytes", o.n)
+ if err := queue.RequestRevision(revision); err != nil {
+ return nil, err
}
- if _, err := fmt.Fprintln(o.cmd, revision.String()); err != nil {
+ object, err := queue.ReadObject()
+ if err != nil {
return nil, err
}
- oi, err := ParseObjectInfo(o.stdout)
+ return object, nil
+}
+
+func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) {
+ queue, finish, err := o.objectQueue(ctx, "catfile.ObjectQueue")
if err != nil {
- return nil, err
+ return nil, nil, err
}
- trace.recordRequest(oi.Type)
+ return queue, finish, nil
+}
- o.n = oi.Size + 1
+// Object represents data returned by `git cat-file --batch`
+type Object struct {
+ // ObjectInfo represents main information about object
+ ObjectInfo
- return &Object{
- ObjectInfo: *oi,
- parent: o,
- dataReader: io.LimitedReader{
- R: o.stdout,
- N: oi.Size,
- },
- }, nil
+ // dataReader is reader which has all the object data.
+ dataReader io.LimitedReader
+
+ // bytesLeft tracks the number of bytes which are left to be read. While this duplicates the
+ // information tracked in dataReader.N, this cannot be helped given that we need to make
+ // access to this information atomic so there's no race between updating it and checking the
+ // process for dirtiness. While we could use locking instead of atomics, we'd have to lock
+ // during the whole read duration -- and thus it'd become impossible to check for dirtiness
+ // at the same time.
+ bytesRemaining int64
+
+ // closed determines whether the object is closed for reading.
+ closed int32
}
-func (o *Object) Read(p []byte) (int, error) {
- o.parent.Lock()
- defer o.parent.Unlock()
+// isDirty determines whether the object is still dirty, that is whether there are still unconsumed
+// bytes.
+func (o *Object) isDirty() bool {
+ return atomic.LoadInt64(&o.bytesRemaining) != 0
+}
+
+func (o *Object) isClosed() bool {
+ return atomic.LoadInt32(&o.closed) == 1
+}
+
+func (o *Object) close() {
+ atomic.StoreInt32(&o.closed, 1)
+}
- if o.parent.closed {
+func (o *Object) Read(p []byte) (int, error) {
+ if o.isClosed() {
return 0, os.ErrClosed
}
n, err := o.dataReader.Read(p)
- o.parent.consume(int64(n))
+ if atomic.AddInt64(&o.bytesRemaining, int64(-n)) < 0 {
+ return n, fmt.Errorf("bytes remaining became negative while reading object")
+ }
+
return n, err
}
@@ -182,16 +195,16 @@ func (o *Object) Read(p []byte) (int, error) {
// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
// implemented by the respective reader or writer.
func (o *Object) WriteTo(w io.Writer) (int64, error) {
- o.parent.Lock()
- defer o.parent.Unlock()
-
- if o.parent.closed {
+ if o.isClosed() {
return 0, os.ErrClosed
}
// While the `io.LimitedReader` does not support WriteTo, `io.Copy()` will make use of
// `ReadFrom()` in case the writer implements it.
n, err := io.Copy(w, &o.dataReader)
- o.parent.consume(n)
+ if atomic.AddInt64(&o.bytesRemaining, -n) < 0 {
+ return n, fmt.Errorf("bytes remaining became negative while reading object")
+ }
+
return n, err
}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index aad733f78..165d782de 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -1,8 +1,10 @@
package catfile
import (
+ "errors"
"fmt"
"io"
+ "os"
"testing"
"github.com/prometheus/client_golang/prometheus"
@@ -76,7 +78,7 @@ func TestObjectReader_reader(t *testing.T) {
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)+1))
+ require.EqualError(t, err, "current object has not been fully read")
})
t.Run("read fails when partially consuming previous object", func(t *testing.T) {
@@ -91,7 +93,7 @@ func TestObjectReader_reader(t *testing.T) {
// We haven't yet consumed the previous object, so this must now fail.
_, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, fmt.Sprintf("cannot create new Object: batch contains %d unread bytes", len(commitContents)-100+1))
+ require.EqualError(t, err, "current object has not been fully read")
})
t.Run("read increments Prometheus counter", func(t *testing.T) {
@@ -118,3 +120,233 @@ func TestObjectReader_reader(t *testing.T) {
}
})
}
+
+func TestObjectReader_queue(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
+
+ foobarBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ barfooBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
+
+ t.Run("read single object", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ })
+
+ t.Run("read multiple objects", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ for blobID, blobContents := range map[git.ObjectID]string{
+ foobarBlob: "foobar",
+ barfooBlob: "barfoo",
+ } {
+ require.NoError(t, queue.RequestRevision(blobID.Revision()))
+
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, blobContents, string(contents))
+ }
+ })
+
+ t.Run("request multiple objects", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(barfooBlob.Revision()))
+
+ for _, expectedContents := range []string{"foobar", "barfoo"} {
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, expectedContents, string(contents))
+ }
+ })
+
+ t.Run("read without request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ _, err = queue.ReadObject()
+ require.Equal(t, errors.New("no outstanding request"), err)
+ })
+
+ t.Run("request invalid object", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
+
+ _, err = queue.ReadObject()
+ require.Equal(t, NotFoundError{errors.New("object not found")}, err)
+ })
+
+ t.Run("can continue reading after NotFoundError", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
+ _, err = queue.ReadObject()
+ require.Equal(t, NotFoundError{errors.New("object not found")}, err)
+
+ // Requesting another object after the previous one has failed should continue to
+ // work alright.
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, "foobar", string(contents))
+ })
+
+ t.Run("requesting multiple queues fails", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ _, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ _, _, err = reader.objectQueue(ctx, "trace")
+ require.Equal(t, errors.New("object queue already in use"), err)
+
+ // After calling cleanup we should be able to create an object queue again.
+ cleanup()
+
+ _, cleanup, err = reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+ })
+
+ t.Run("requesting object dirties reader", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.False(t, reader.isDirty())
+ require.False(t, queue.isDirty())
+
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+
+ require.True(t, reader.isDirty())
+ require.True(t, queue.isDirty())
+
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ // The object has not been consumed yet, so the reader must still be dirty.
+ require.True(t, reader.isDirty())
+ require.True(t, queue.isDirty())
+
+ _, err = io.ReadAll(object)
+ require.NoError(t, err)
+
+ require.False(t, reader.isDirty())
+ require.False(t, queue.isDirty())
+ })
+
+ t.Run("closing queue blocks request", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ queue.close()
+
+ require.True(t, reader.isClosed())
+ require.True(t, queue.isClosed())
+
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(foobarBlob.Revision()))
+ })
+
+ t.Run("closing queue blocks read", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ // Request the object before we close the queue.
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+
+ queue.close()
+
+ require.True(t, reader.isClosed())
+ require.True(t, queue.isClosed())
+
+ _, err = queue.ReadObject()
+ require.Equal(t, fmt.Errorf("cannot read object: %w", os.ErrClosed), err)
+ })
+
+ t.Run("closing queue blocks consuming", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ queue, cleanup, err := reader.objectQueue(ctx, "trace")
+ require.NoError(t, err)
+ defer cleanup()
+
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+
+ // Read the object header before closing.
+ object, err := queue.ReadObject()
+ require.NoError(t, err)
+
+ queue.close()
+
+ require.True(t, reader.isClosed())
+ require.True(t, queue.isClosed())
+ require.True(t, object.isClosed())
+
+ _, err = io.ReadAll(object)
+ require.Equal(t, os.ErrClosed, err)
+ })
+}
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
new file mode 100644
index 000000000..279b9d0c2
--- /dev/null
+++ b/internal/git/catfile/request_queue.go
@@ -0,0 +1,166 @@
+package catfile
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+ "sync/atomic"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+)
+
+type requestQueue struct {
+ // isObjectQueue is set to `true` when this is a request queue which can be used for reading
+ // objects. If set to `false`, then this can only be used to read object info.
+ isObjectQueue bool
+
+ stdout *bufio.Reader
+ stdin io.Writer
+
+ // outstandingRequests is the number of requests which have been queued up. Gets incremented
+ // on request, and decremented when starting to read an object (not when that object has
+ // been fully consumed).
+ outstandingRequests int64
+
+ // closed indicates whether the queue is closed for additional requests.
+ closed int32
+
+ // currentObject is the currently read object.
+ currentObject *Object
+ currentObjectLock sync.Mutex
+
+ // trace is the current tracing span.
+ trace *trace
+}
+
+// isDirty returns true either if there are outstanding requests for objects or if the current
+// object hasn't yet been fully consumed.
+func (q *requestQueue) isDirty() bool {
+ q.currentObjectLock.Lock()
+ defer q.currentObjectLock.Unlock()
+
+ // We must check for the current object first: we cannot queue another object due to the
+ // object lock, but we may queue another request while checking for dirtiness.
+ if q.currentObject != nil {
+ return q.currentObject.isDirty()
+ }
+
+ if atomic.LoadInt64(&q.outstandingRequests) != 0 {
+ return true
+ }
+
+ return false
+}
+
+func (q *requestQueue) isClosed() bool {
+ return atomic.LoadInt32(&q.closed) == 1
+}
+
+func (q *requestQueue) close() {
+ if atomic.CompareAndSwapInt32(&q.closed, 0, 1) {
+ q.currentObjectLock.Lock()
+ defer q.currentObjectLock.Unlock()
+
+ if q.currentObject != nil {
+ q.currentObject.close()
+ }
+ }
+}
+
+func (q *requestQueue) RequestRevision(revision git.Revision) error {
+ if q.isClosed() {
+ return fmt.Errorf("cannot request revision: %w", os.ErrClosed)
+ }
+
+ atomic.AddInt64(&q.outstandingRequests, 1)
+
+ if _, err := fmt.Fprintln(q.stdin, revision.String()); err != nil {
+ atomic.AddInt64(&q.outstandingRequests, -1)
+ return fmt.Errorf("requesting revision: %w", err)
+ }
+
+ return nil
+}
+
+func (q *requestQueue) ReadObject() (*Object, error) {
+ if !q.isObjectQueue {
+ panic("object queue used to read object info")
+ }
+
+ q.currentObjectLock.Lock()
+ defer q.currentObjectLock.Unlock()
+
+ if q.isClosed() {
+ return nil, fmt.Errorf("cannot read object: %w", os.ErrClosed)
+ }
+
+ if atomic.LoadInt64(&q.outstandingRequests) == 0 {
+ return nil, fmt.Errorf("no outstanding request")
+ }
+
+ if q.currentObject != nil {
+ // If the current object is still dirty, then we must not try to read a new object.
+ if q.currentObject.isDirty() {
+ return nil, fmt.Errorf("current object has not been fully read")
+ }
+
+ q.currentObject.close()
+ q.currentObject = nil
+
+ // If we have already read an object before, then we must consume the trailing
+ // newline after the object's data.
+ if _, err := q.stdout.ReadByte(); err != nil {
+ return nil, err
+ }
+ }
+
+ oi, err := ParseObjectInfo(q.stdout)
+ if err != nil {
+ return nil, err
+ }
+ q.trace.recordRequest(oi.Type)
+
+ if atomic.AddInt64(&q.outstandingRequests, -1) < 0 {
+ return nil, fmt.Errorf("negative number of requests")
+ }
+
+ q.currentObject = &Object{
+ ObjectInfo: *oi,
+ dataReader: io.LimitedReader{
+ R: q.stdout,
+ N: oi.Size,
+ },
+ bytesRemaining: oi.Size,
+ }
+
+ return q.currentObject, nil
+}
+
+func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {
+ if q.isObjectQueue {
+ panic("object queue used to read object info")
+ }
+
+ if q.isClosed() {
+ return nil, fmt.Errorf("cannot read object info: %w", os.ErrClosed)
+ }
+
+ // We first need to determine wether there are any queued requests at all. If not, then we
+ // cannot read anything.
+ queuedRequests := atomic.LoadInt64(&q.outstandingRequests)
+ if queuedRequests == 0 {
+ return nil, fmt.Errorf("no outstanding request")
+ }
+
+ // And when there are, we need to remove one of these queued requests. We do so via
+ // `CompareAndSwapInt64()`, which easily allows us to detect concurrent access to the queue.
+ if !atomic.CompareAndSwapInt64(&q.outstandingRequests, queuedRequests, queuedRequests-1) {
+ return nil, fmt.Errorf("concurrent access to object info queue")
+ }
+
+ q.trace.recordRequest("info")
+
+ return ParseObjectInfo(q.stdout)
+}
diff --git a/internal/git/catfile/tracing.go b/internal/git/catfile/tracing.go
index ab2a76d3d..a1a6fd90b 100644
--- a/internal/git/catfile/tracing.go
+++ b/internal/git/catfile/tracing.go
@@ -2,6 +2,7 @@ package catfile
import (
"context"
+ "sync"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@@ -11,18 +12,18 @@ type trace struct {
span opentracing.Span
counter *prometheus.CounterVec
- requests map[string]int
+ requestsLock sync.Mutex
+ requests map[string]int
}
// startTrace starts a new tracing span and updates metrics according to how many requests have been
-// done during that trace. This must be called with two contexts: first the per-RPC context, which
-// is the context of the current RPC call. And then the cache context, which is the decorrelated
-// context for cached catfile processes. Spans are then created for both contexts.
+// done during that trace. The caller must call `finish()` on the resulting after it's deemed to be
+// done such that metrics get recorded correctly.
func startTrace(
ctx context.Context,
counter *prometheus.CounterVec,
methodName string,
-) (*trace, func()) {
+) *trace {
span, _ := opentracing.StartSpanFromContext(ctx, methodName)
trace := &trace{
@@ -37,14 +38,19 @@ func startTrace(
},
}
- return trace, trace.finish
+ return trace
}
func (t *trace) recordRequest(requestType string) {
+ t.requestsLock.Lock()
+ defer t.requestsLock.Unlock()
t.requests[requestType]++
}
func (t *trace) finish() {
+ t.requestsLock.Lock()
+ defer t.requestsLock.Unlock()
+
for requestType, requestCount := range t.requests {
if requestCount == 0 {
continue
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 1c00443d0..e87628f38 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -40,6 +40,12 @@ func WithSkipCatfileInfoResult(skipResult func(*catfile.ObjectInfo) bool) Catfil
}
}
+type catfileInfoRequest struct {
+ objectID git.ObjectID
+ objectName []byte
+ err error
+}
+
// CatfileInfo processes revlistResults from the given channel and extracts object information via
// `git cat-file --batch-check`. The returned channel will contain all processed catfile info
// results. Any error received via the channel or encountered in this step will cause the pipeline
@@ -49,21 +55,67 @@ func CatfileInfo(
objectInfoReader catfile.ObjectInfoReader,
it ObjectIterator,
opts ...CatfileInfoOption,
-) CatfileInfoIterator {
+) (CatfileInfoIterator, error) {
var cfg catfileInfoConfig
for _, opt := range opts {
opt(&cfg)
}
+ queue, cleanup, err := objectInfoReader.InfoQueue(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer cleanup()
+
+ requestChan := make(chan catfileInfoRequest)
+ go func() {
+ defer close(requestChan)
+
+ for it.Next() {
+ if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ select {
+ case requestChan <- catfileInfoRequest{
+ objectID: it.ObjectID(),
+ objectName: it.ObjectName(),
+ }:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ select {
+ case requestChan <- catfileInfoRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
resultChan := make(chan CatfileInfoResult)
go func() {
defer close(resultChan)
- for it.Next() {
- objectInfo, err := objectInfoReader.Info(ctx, it.ObjectID().Revision())
+ // It's fine to iterate over the request channel without paying attention to
+ // context cancellation because the request channel itself would be closed if the
+ // context was cancelled.
+ for request := range requestChan {
+ if request.err != nil {
+ sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: request.err})
+ break
+ }
+
+ objectInfo, err := queue.ReadInfo()
if err != nil {
sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
- err: fmt.Errorf("retrieving object info for %q: %w", it.ObjectID(), err),
+ err: fmt.Errorf("retrieving object info for %q: %w", request.objectID, err),
})
return
}
@@ -73,22 +125,17 @@ func CatfileInfo(
}
if isDone := sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{
- ObjectName: it.ObjectName(),
+ ObjectName: request.objectName,
ObjectInfo: objectInfo,
}); isDone {
return
}
}
-
- if err := it.Err(); err != nil {
- sendCatfileInfoResult(ctx, resultChan, CatfileInfoResult{err: err})
- return
- }
}()
return &catfileInfoIterator{
ch: resultChan,
- }
+ }, nil
}
// CatfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 16e2980bf..6cddfd808 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -136,7 +136,8 @@ func TestCatfileInfo(t *testing.T) {
objectInfoReader, err := catfileCache.ObjectInfoReader(ctx, repo)
require.NoError(t, err)
- it := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
+ it, err := CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(tc.revlistInputs), tc.opts...)
+ require.NoError(t, err)
var results []CatfileInfoResult
for it.Next() {
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index d5aa1e7f6..5ec2e9fb5 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -23,6 +23,11 @@ type CatfileObjectResult struct {
git.Object
}
+type catfileObjectRequest struct {
+ objectName []byte
+ err error
+}
+
// CatfileObject processes catfileInfoResults from the given channel and reads associated objects
// into memory via `git cat-file --batch`. The returned channel will contain all processed objects.
// Any error received via the channel or encountered in this step will cause the pipeline to fail.
@@ -32,7 +37,42 @@ func CatfileObject(
ctx context.Context,
objectReader catfile.ObjectReader,
it ObjectIterator,
-) CatfileObjectIterator {
+) (CatfileObjectIterator, error) {
+ queue, cleanup, err := objectReader.ObjectQueue(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer cleanup()
+
+ requestChan := make(chan catfileObjectRequest)
+ go func() {
+ defer close(requestChan)
+
+ for it.Next() {
+ if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ select {
+ case requestChan <- catfileObjectRequest{objectName: it.ObjectName()}:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ if err := it.Err(); err != nil {
+ select {
+ case requestChan <- catfileObjectRequest{err: err}:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
resultChan := make(chan CatfileObjectResult)
go func() {
defer close(resultChan)
@@ -60,7 +100,15 @@ func CatfileObject(
var previousObject *synchronizingObject
- for it.Next() {
+ // It's fine to iterate over the request channel without paying attention to
+ // context cancellation because the request channel itself would be closed if the
+ // context was cancelled.
+ for request := range requestChan {
+ if request.err != nil {
+ sendResult(CatfileObjectResult{err: request.err})
+ break
+ }
+
// We mustn't try to read another object before reading the previous object
// has concluded. Given that this is not under our control but under the
// control of the caller, we thus have to wait until the blocking reader has
@@ -73,7 +121,7 @@ func CatfileObject(
}
}
- object, err := objectReader.Object(ctx, it.ObjectID().Revision())
+ object, err := queue.ReadObject()
if err != nil {
sendResult(CatfileObjectResult{
err: fmt.Errorf("requesting object: %w", err),
@@ -87,22 +135,17 @@ func CatfileObject(
}
if isDone := sendResult(CatfileObjectResult{
- ObjectName: it.ObjectName(),
+ ObjectName: request.objectName,
Object: previousObject,
}); isDone {
return
}
}
-
- if err := it.Err(); err != nil {
- sendResult(CatfileObjectResult{err: err})
- return
- }
}()
return &catfileObjectIterator{
ch: resultChan,
- }
+ }, nil
}
type synchronizingObject struct {
diff --git a/internal/git/gitpipe/catfile_object_test.go b/internal/git/gitpipe/catfile_object_test.go
index fd02726d9..6cb20aa0b 100644
--- a/internal/git/gitpipe/catfile_object_test.go
+++ b/internal/git/gitpipe/catfile_object_test.go
@@ -78,7 +78,8 @@ func TestCatfileObject(t *testing.T) {
objectReader, err := catfileCache.ObjectReader(ctx, repo)
require.NoError(t, err)
- it := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ it, err := CatfileObject(ctx, objectReader, NewCatfileInfoIterator(tc.catfileInfoInputs))
+ require.NoError(t, err)
var results []CatfileObjectResult
for it.Next() {
diff --git a/internal/git/gitpipe/pipeline_test.go b/internal/git/gitpipe/pipeline_test.go
index 25afd4259..a80f7e769 100644
--- a/internal/git/gitpipe/pipeline_test.go
+++ b/internal/git/gitpipe/pipeline_test.go
@@ -222,8 +222,12 @@ func TestPipeline_revlist(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, tc.revisions, tc.revlistOptions...)
- catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter, tc.catfileInfoOptions...)
+ require.NoError(t, err)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
var results []CatfileObjectResult
for catfileObjectIter.Next() {
@@ -274,8 +278,12 @@ func TestPipeline_revlist(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, []string{"--all"})
- catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter)
+ require.NoError(t, err)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
i := 0
for catfileObjectIter.Next() {
@@ -311,8 +319,12 @@ func TestPipeline_revlist(t *testing.T) {
require.NoError(t, err)
revlistIter := Revlist(ctx, repo, []string{"--all"}, WithObjects())
- catfileInfoIter := CatfileInfo(ctx, objectInfoReader, revlistIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, revlistIter)
+ require.NoError(t, err)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
i := 0
var wg sync.WaitGroup
@@ -361,8 +373,12 @@ func TestPipeline_forEachRef(t *testing.T) {
require.NoError(t, err)
forEachRefIter := ForEachRef(ctx, repo, nil)
- catfileInfoIter := CatfileInfo(ctx, objectInfoReader, forEachRefIter)
- catfileObjectIter := CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileInfoIter, err := CatfileInfo(ctx, objectInfoReader, forEachRefIter)
+ require.NoError(t, err)
+
+ catfileObjectIter, err := CatfileObject(ctx, objectReader, catfileInfoIter)
+ require.NoError(t, err)
type object struct {
oid git.ObjectID
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 55c406dd8..97e30ba7d 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -103,7 +103,10 @@ func (s *server) processBlobs(
return helper.ErrInternal(fmt.Errorf("creating object info reader: %w", err))
}
- catfileInfoIter = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
+ catfileInfoIter, err = gitpipe.CatfileInfo(ctx, objectInfoReader, objectIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object info iterator: %w", err)
+ }
}
var i uint32
@@ -134,7 +137,10 @@ func (s *server) processBlobs(
return helper.ErrInternal(fmt.Errorf("creating object reader: %w", err))
}
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, objectIter)
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, objectIter)
+ if err != nil {
+ return helper.ErrInternalf("creating catfile object iterator: %w", err)
+ }
var i uint32
for catfileObjectIter.Next() {
diff --git a/internal/gitaly/service/blob/blobs_test.go b/internal/gitaly/service/blob/blobs_test.go
index ae668dce4..9a2eaa717 100644
--- a/internal/gitaly/service/blob/blobs_test.go
+++ b/internal/gitaly/service/blob/blobs_test.go
@@ -428,23 +428,88 @@ func BenchmarkListAllBlobs(b *testing.B) {
_, repoProto, _, client := setup(b)
- b.Run("ListAllBlobs", func(b *testing.B) {
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- stream, err := client.ListAllBlobs(ctx, &gitalypb.ListAllBlobsRequest{
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.ListAllBlobsRequest
+ }{
+ {
+ desc: "with contents",
+ request: &gitalypb.ListAllBlobsRequest{
Repository: repoProto,
BytesLimit: -1,
- })
- require.NoError(b, err)
+ },
+ },
+ {
+ desc: "without contents",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: repoProto,
+ BytesLimit: 0,
+ },
+ },
+ } {
+ b.Run(tc.desc, func(b *testing.B) {
+ b.ReportAllocs()
- for {
- _, err := stream.Recv()
- if err == io.EOF {
- break
+ for i := 0; i < b.N; i++ {
+ stream, err := client.ListAllBlobs(ctx, tc.request)
+ require.NoError(b, err)
+
+ for {
+ _, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(b, err)
}
+ }
+ })
+ }
+}
+
+func BenchmarkListBlobs(b *testing.B) {
+ b.StopTimer()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, repoProto, _, client := setup(b)
+
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.ListBlobsRequest
+ }{
+ {
+ desc: "with contents",
+ request: &gitalypb.ListBlobsRequest{
+ Repository: repoProto,
+ Revisions: []string{"refs/heads/master"},
+ BytesLimit: -1,
+ },
+ },
+ {
+ desc: "without contents",
+ request: &gitalypb.ListBlobsRequest{
+ Repository: repoProto,
+ Revisions: []string{"refs/heads/master"},
+ BytesLimit: 0,
+ },
+ },
+ } {
+ b.Run(tc.desc, func(b *testing.B) {
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ stream, err := client.ListBlobs(ctx, tc.request)
require.NoError(b, err)
+
+ for {
+ _, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(b, err)
+ }
}
- }
- })
+ })
+ }
}
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index a937881ff..ca05b4750 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -62,7 +62,11 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
gitpipe.WithBlobLimit(lfsPointerMaxSize),
gitpipe.WithObjectTypeFilter(gitpipe.ObjectTypeBlob),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
@@ -100,7 +104,11 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, int(in.Limit)); err != nil {
return err
@@ -144,12 +152,19 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
blobs[i] = gitpipe.RevisionResult{OID: git.ObjectID(blobID)}
}
- catfileInfoIter := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs),
+ catfileInfoIter, err := gitpipe.CatfileInfo(ctx, objectInfoReader, gitpipe.NewRevisionIterator(blobs),
gitpipe.WithSkipCatfileInfoResult(func(objectInfo *catfile.ObjectInfo) bool {
return objectInfo.Type != "blob" || objectInfo.Size > lfsPointerMaxSize
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object info iterator: %w", err)
+ }
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return helper.ErrInternalf("creating object iterator: %w", err)
+ }
if err := sendLFSPointers(chunker, catfileObjectIter, 0); err != nil {
return err
diff --git a/internal/gitaly/service/commit/list_all_commits.go b/internal/gitaly/service/commit/list_all_commits.go
index 0ab8a9b66..d5fa18622 100644
--- a/internal/gitaly/service/commit/list_all_commits.go
+++ b/internal/gitaly/service/commit/list_all_commits.go
@@ -53,7 +53,10 @@ func (s *server) ListAllCommits(
}),
)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, catfileInfoIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&commitsSender{
send: func(commits []*gitalypb.GitCommit) error {
diff --git a/internal/gitaly/service/commit/list_commits.go b/internal/gitaly/service/commit/list_commits.go
index eb6bb4b0c..826edb9ee 100644
--- a/internal/gitaly/service/commit/list_commits.go
+++ b/internal/gitaly/service/commit/list_commits.go
@@ -100,7 +100,11 @@ func (s *server) ListCommits(
}
revlistIter := gitpipe.Revlist(ctx, repo, request.GetRevisions(), revlistOptions...)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&commitsSender{
send: func(commits []*gitalypb.GitCommit) error {
diff --git a/internal/gitaly/service/commit/tree_entries_helper.go b/internal/gitaly/service/commit/tree_entries_helper.go
index 09853a890..e26265f4d 100644
--- a/internal/gitaly/service/commit/tree_entries_helper.go
+++ b/internal/gitaly/service/commit/tree_entries_helper.go
@@ -143,16 +143,15 @@ func treeEntries(
}
return nil, err
}
- defer func() {
- if _, err := io.Copy(io.Discard, treeObj); err != nil && returnedErr == nil {
- returnedErr = fmt.Errorf("discarding object: %w", err)
- }
- }()
// The tree entry may not refer to a subtree, but instead to a blob. Historically, we have
// simply ignored such objects altogether and didn't return an error, so we keep the same
// behaviour.
if treeObj.Type != "tree" {
+ if _, err := io.Copy(io.Discard, treeObj); err != nil && returnedErr == nil {
+ return nil, fmt.Errorf("discarding object: %w", err)
+ }
+
return nil, nil
}
diff --git a/internal/gitaly/service/ref/find_all_tags.go b/internal/gitaly/service/ref/find_all_tags.go
index 47eb787a2..4bfb2d0e9 100644
--- a/internal/gitaly/service/ref/find_all_tags.go
+++ b/internal/gitaly/service/ref/find_all_tags.go
@@ -52,7 +52,11 @@ func (s *server) findAllTags(ctx context.Context, repo *localrepo.Repo, sortFiel
gitpipe.WithSortField(sortField),
gitpipe.WithForEachRefFormat("%(objectname) %(refname)%(if)%(*objectname)%(then)\n%(objectname)^{} PEELED%(end)"),
)
- catfileObjectsIter := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter)
+
+ catfileObjectsIter, err := gitpipe.CatfileObject(ctx, objectReader, forEachRefIter)
+ if err != nil {
+ return err
+ }
chunker := chunk.New(&tagSender{stream: stream})
diff --git a/internal/gitaly/service/ref/tag_signatures.go b/internal/gitaly/service/ref/tag_signatures.go
index 8ffaf4de9..8bdf78479 100644
--- a/internal/gitaly/service/ref/tag_signatures.go
+++ b/internal/gitaly/service/ref/tag_signatures.go
@@ -58,7 +58,11 @@ func (s *server) GetTagSignatures(req *gitalypb.GetTagSignaturesRequest, stream
}
revlistIter := gitpipe.Revlist(ctx, repo, req.GetTagRevisions(), revlistOptions...)
- catfileObjectIter := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+
+ catfileObjectIter, err := gitpipe.CatfileObject(ctx, objectReader, revlistIter)
+ if err != nil {
+ return err
+ }
for catfileObjectIter.Next() {
tag := catfileObjectIter.Result()