diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-11-16 20:58:16 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-11-16 20:58:16 +0300 |
commit | 0a8c4c2fd1180046291efb23a068bce778d06244 (patch) | |
tree | e580dbe813f1783e858da4f7f0f0372d3cc927c9 | |
parent | e8342ee1b0aa110e932c3bf5f1e2a2e675f06d7b (diff) |
Revert "Merge branch '4420-utilize-git-cat-file-batch-command-mode' into 'master'"
This reverts merge request !4972
30 files changed, 232 insertions, 1113 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index cde7957f9..3fb00a588 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -14,7 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/metadata" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/labkit/correlation" ) @@ -35,7 +34,7 @@ const ( type Cache interface { // ObjectReader either creates a new object reader or returns a cached one for the given // repository. - ObjectReader(context.Context, git.RepositoryExecutor) (ObjectContentReader, func(), error) + ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, func(), error) // ObjectInfoReader either creates a new object info reader or returns a cached one for the // given repository. ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, func(), error) @@ -171,27 +170,17 @@ func (c *ProcessCache) Stop() { } // ObjectReader creates a new ObjectReader process for the given repository. -func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectContentReader, func(), error) { - var cached cacheable - var err error - var cancel func() - - if featureflag.CatfileBatchCommand.IsEnabled(ctx) { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { - return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectReader") - } else { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { - return newObjectContentReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectContentReader") - } +func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, func(), error) { + cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { + return newObjectReader(ctx, repo, c.catfileLookupCounter) + }, "catfile.ObjectReader") if err != nil { return nil, nil, err } - objectReader, ok := cached.(ObjectContentReader) + objectReader, ok := cacheable.(ObjectReader) if !ok { - return nil, nil, fmt.Errorf("expected object reader, got %T", cached) + return nil, nil, fmt.Errorf("expected object reader, got %T", cacheable) } return objectReader, cancel, nil @@ -199,26 +188,16 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec // ObjectInfoReader creates a new ObjectInfoReader process for the given repository. func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, func(), error) { - var cached cacheable - var err error - var cancel func() - - if featureflag.CatfileBatchCommand.IsEnabled(ctx) { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { - return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectReader") - } else { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { - return newObjectInfoReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectInfoReader") - } + cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { + return newObjectInfoReader(ctx, repo, c.catfileLookupCounter) + }, "catfile.ObjectInfoReader") if err != nil { return nil, nil, err } - objectInfoReader, ok := cached.(ObjectInfoReader) + objectInfoReader, ok := cacheable.(ObjectInfoReader) if !ok { - return nil, nil, fmt.Errorf("expected object info reader, got %T", cached) + return nil, nil, fmt.Errorf("expected object info reader, got %T", cacheable) } return objectInfoReader, cancel, nil diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index 1a20f37c5..b6c4f544d 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -13,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" "gitlab.com/gitlab-org/labkit/correlation" @@ -204,12 +203,7 @@ func TestCache_autoExpiry(t *testing.T) { } func TestCache_ObjectReader(t *testing.T) { - t.Parallel() - - testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testCacheObjectReader) -} - -func testCacheObjectReader(t *testing.T, ctx context.Context) { + ctx := testhelper.Context(t) cfg := testcfg.Build(t) repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ @@ -309,12 +303,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { } func TestCache_ObjectInfoReader(t *testing.T) { - t.Parallel() - - testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testCacheObjectInfoReader) -} - -func testCacheObjectInfoReader(t *testing.T, ctx context.Context) { + ctx := testhelper.Context(t) cfg := testcfg.Build(t) repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ @@ -402,7 +391,7 @@ func mustCreateCacheable(t *testing.T, cfg config.Cfg, repo repository.GitRepo) ctx, cancel := context.WithCancel(testhelper.Context(t)) - batch, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repo), nil) + batch, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repo), nil) require.NoError(t, err) return batch, cancel diff --git a/internal/git/catfile/commit.go b/internal/git/catfile/commit.go index 36aa8ca0b..93799e3dc 100644 --- a/internal/git/catfile/commit.go +++ b/internal/git/catfile/commit.go @@ -14,7 +14,7 @@ import ( ) // GetCommit looks up a commit by revision using an existing Batch instance. -func GetCommit(ctx context.Context, objectReader ObjectContentReader, revision git.Revision) (*gitalypb.GitCommit, error) { +func GetCommit(ctx context.Context, objectReader ObjectReader, revision git.Revision) (*gitalypb.GitCommit, error) { object, err := objectReader.Object(ctx, revision+"^{commit}") if err != nil { return nil, err @@ -29,7 +29,7 @@ func GetCommitWithTrailers( ctx context.Context, gitCmdFactory git.CommandFactory, repo repository.GitRepo, - objectReader ObjectContentReader, + objectReader ObjectReader, revision git.Revision, ) (*gitalypb.GitCommit, error) { commit, err := GetCommit(ctx, objectReader, revision) @@ -67,7 +67,7 @@ func GetCommitWithTrailers( } // GetCommitMessage looks up a commit message and returns it in its entirety. -func GetCommitMessage(ctx context.Context, objectReader ObjectContentReader, repo repository.GitRepo, revision git.Revision) ([]byte, error) { +func GetCommitMessage(ctx context.Context, objectReader ObjectReader, repo repository.GitRepo, revision git.Revision) ([]byte, error) { obj, err := objectReader.Object(ctx, revision+"^{commit}") if err != nil { return nil, err diff --git a/internal/git/catfile/object_content_reader.go b/internal/git/catfile/object_content_reader.go deleted file mode 100644 index 919e3eff0..000000000 --- a/internal/git/catfile/object_content_reader.go +++ /dev/null @@ -1,157 +0,0 @@ -package catfile - -import ( - "bufio" - "context" - "fmt" - "io" - "sync/atomic" - - "github.com/prometheus/client_golang/prometheus" - "gitlab.com/gitlab-org/gitaly/v15/internal/command" - "gitlab.com/gitlab-org/gitaly/v15/internal/git" -) - -// ObjectContentReader is a reader for Git objects. -type ObjectContentReader interface { - cacheable - - // Reader returns a new Object for the given revision. The Object must be fully consumed - // before another object is requested. - Object(context.Context, git.Revision) (*Object, error) - - // ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests. - // Using the queue is more efficient than using `Object()` when requesting a bunch of - // objects. The returned function must be executed after use of the ObjectQueue has - // finished. - ObjectQueue(context.Context) (ObjectQueue, func(), error) -} - -// objectContentReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file -// --batch` process such that we do not have to spawn a new process for each object we are about to -// read. -type objectContentReader struct { - cmd *command.Command - - counter *prometheus.CounterVec - - queue requestQueue - queueInUse int32 -} - -func newObjectContentReader( - ctx context.Context, - repo git.RepositoryExecutor, - counter *prometheus.CounterVec, -) (*objectContentReader, error) { - batchCmd, err := repo.Exec(ctx, - git.SubCmd{ - Name: "cat-file", - Flags: []git.Option{ - git.Flag{Name: "--batch"}, - git.Flag{Name: "--buffer"}, - }, - }, - git.WithSetupStdin(), - ) - if err != nil { - return nil, err - } - - objectHash, err := repo.ObjectHash(ctx) - if err != nil { - return nil, fmt.Errorf("detecting object hash: %w", err) - } - - objectReader := &objectContentReader{ - cmd: batchCmd, - counter: counter, - queue: requestQueue{ - objectHash: objectHash, - isObjectQueue: true, - stdout: bufio.NewReader(batchCmd), - stdin: bufio.NewWriter(batchCmd), - }, - } - - return objectReader, nil -} - -func (o *objectContentReader) close() { - o.queue.close() - _ = o.cmd.Wait() -} - -func (o *objectContentReader) isClosed() bool { - return o.queue.isClosed() -} - -func (o *objectContentReader) isDirty() bool { - return o.queue.isDirty() -} - -func (o *objectContentReader) objectQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) { - if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) { - return nil, nil, fmt.Errorf("object queue already in use") - } - - trace := startTrace(ctx, o.counter, tracedMethod) - o.queue.trace = trace - - return &o.queue, func() { - atomic.StoreInt32(&o.queueInUse, 0) - trace.finish() - }, nil -} - -func (o *objectContentReader) Object(ctx context.Context, revision git.Revision) (*Object, error) { - queue, finish, err := o.objectQueue(ctx, "catfile.Object") - if err != nil { - return nil, err - } - defer finish() - - if err := queue.RequestObject(revision); err != nil { - return nil, err - } - - if err := queue.Flush(); err != nil { - return nil, err - } - - object, err := queue.ReadObject() - if err != nil { - return nil, err - } - - return object, nil -} - -func (o *objectContentReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) { - queue, finish, err := o.objectQueue(ctx, "catfile.ObjectQueue") - if err != nil { - return nil, nil, err - } - return queue, finish, nil -} - -// Object represents data returned by `git cat-file --batch` -type Object struct { - // ObjectInfo represents main information about object - ObjectInfo - - // dataReader is reader which has all the object data. - dataReader io.Reader -} - -func (o *Object) Read(p []byte) (int, error) { - return o.dataReader.Read(p) -} - -// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader -// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are -// implemented by the respective reader or writer. -func (o *Object) WriteTo(w io.Writer) (int64, error) { - // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it. - return io.Copy(w, o.dataReader) -} diff --git a/internal/git/catfile/object_content_reader_test.go b/internal/git/catfile/object_content_reader_test.go deleted file mode 100644 index 92e68c9b8..000000000 --- a/internal/git/catfile/object_content_reader_test.go +++ /dev/null @@ -1,460 +0,0 @@ -package catfile - -import ( - "errors" - "fmt" - "io" - "os" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v15/internal/git" - "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v15/internal/helper/text" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" -) - -func TestObjectContentReader_reader(t *testing.T) { - ctx := testhelper.Context(t) - - cfg := testcfg.Build(t) - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - commitID := gittest.WriteCommit(t, cfg, repoPath, - gittest.WithBranch("main"), - gittest.WithMessage("commit message"), - gittest.WithTreeEntries(gittest.TreeEntry{Path: "README", Mode: "100644", Content: "something"}), - ) - gittest.WriteTag(t, cfg, repoPath, "v1.1.1", commitID.Revision(), gittest.WriteTagConfig{ - Message: "annotated tag", - }) - - commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", commitID.String()) - - t.Run("read existing object by ref", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.Object(ctx, "refs/heads/main") - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, commitContents, data) - }) - - t.Run("read existing object by object ID", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.Object(ctx, commitID.Revision()) - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, data, commitContents) - }) - - t.Run("read missing ref", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, err = reader.Object(ctx, "refs/heads/does-not-exist") - require.EqualError(t, err, "object not found") - - // Verify that we're still able to read a commit after the previous read has failed. - object, err := reader.Object(ctx, commitID.Revision()) - require.NoError(t, err) - - data, err := io.ReadAll(object) - require.NoError(t, err) - - require.Equal(t, commitContents, data) - }) - - t.Run("read fails when not consuming previous object", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, err = reader.Object(ctx, commitID.Revision()) - require.NoError(t, err) - - // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, "current object has not been fully read") - }) - - t.Run("read fails when partially consuming previous object", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.Object(ctx, commitID.Revision()) - require.NoError(t, err) - - _, err = io.CopyN(io.Discard, object, 100) - require.NoError(t, err) - - // We haven't yet consumed the previous object, so this must now fail. - _, err = reader.Object(ctx, commitID.Revision()) - require.EqualError(t, err, "current object has not been fully read") - }) - - t.Run("read increments Prometheus counter", func(t *testing.T) { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}) - - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), counter) - require.NoError(t, err) - - for objectType, revision := range map[string]git.Revision{ - "commit": "refs/heads/main", - "tree": "refs/heads/main^{tree}", - "blob": "refs/heads/main:README", - "tag": "refs/tags/v1.1.1", - } { - require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType))) - - object, err := reader.Object(ctx, revision) - require.NoError(t, err) - - require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType))) - - _, err = io.Copy(io.Discard, object) - require.NoError(t, err) - } - }) -} - -func TestObjectContentReader_queue(t *testing.T) { - ctx := testhelper.Context(t) - - cfg := testcfg.Build(t) - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - foobarBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) - barfooBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo")) - - t.Run("read single object", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, "foobar", string(contents)) - }) - - t.Run("read multiple objects", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - for blobID, blobContents := range map[git.ObjectID]string{ - foobarBlob: "foobar", - barfooBlob: "barfoo", - } { - require.NoError(t, queue.RequestObject(blobID.Revision())) - require.NoError(t, queue.Flush()) - - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, blobContents, string(contents)) - } - }) - - t.Run("request multiple objects", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.RequestObject(barfooBlob.Revision())) - require.NoError(t, queue.Flush()) - - for _, expectedContents := range []string{"foobar", "barfoo"} { - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, expectedContents, string(contents)) - } - }) - - t.Run("read without request", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - _, err = queue.ReadObject() - require.Equal(t, errors.New("no outstanding request"), err) - }) - - t.Run("flush with single request", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - // We flush once before and once after requesting the object such that we can be - // sure that it doesn't impact which objects we can read. - require.NoError(t, queue.Flush()) - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, "foobar", string(contents)) - }) - - t.Run("flush with multiple requests", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - for i := 0; i < 10; i++ { - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - } - require.NoError(t, queue.Flush()) - - for i := 0; i < 10; i++ { - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, "foobar", string(contents)) - } - }) - - t.Run("flush without request", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.Flush()) - - _, err = queue.ReadObject() - require.Equal(t, errors.New("no outstanding request"), err) - }) - - t.Run("request invalid object", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject("does-not-exist")) - require.NoError(t, queue.Flush()) - - _, err = queue.ReadObject() - require.Equal(t, NotFoundError{errors.New("object not found")}, err) - }) - - t.Run("can continue reading after NotFoundError", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject("does-not-exist")) - require.NoError(t, queue.Flush()) - - _, err = queue.ReadObject() - require.Equal(t, NotFoundError{errors.New("object not found")}, err) - - // Requesting another object after the previous one has failed should continue to - // work alright. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, "foobar", string(contents)) - }) - - t.Run("requesting multiple queues fails", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - _, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - _, _, err = reader.objectQueue(ctx, "trace") - require.Equal(t, errors.New("object queue already in use"), err) - - // After calling cleanup we should be able to create an object queue again. - cleanup() - - _, cleanup, err = reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - }) - - t.Run("requesting object dirties reader", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.False(t, reader.isDirty()) - require.False(t, queue.isDirty()) - - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - - require.True(t, reader.isDirty()) - require.True(t, queue.isDirty()) - - object, err := queue.ReadObject() - require.NoError(t, err) - - // The object has not been consumed yet, so the reader must still be dirty. - require.True(t, reader.isDirty()) - require.True(t, queue.isDirty()) - - _, err = io.ReadAll(object) - require.NoError(t, err) - - require.False(t, reader.isDirty()) - require.False(t, queue.isDirty()) - }) - - t.Run("closing queue blocks request", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - queue.close() - - require.True(t, reader.isClosed()) - require.True(t, queue.isClosed()) - - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision())) - }) - - t.Run("closing queue blocks read", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - // Request the object before we close the queue. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - - queue.close() - - require.True(t, reader.isClosed()) - require.True(t, queue.isClosed()) - - _, err = queue.ReadObject() - require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err) - }) - - t.Run("closing queue blocks consuming", func(t *testing.T) { - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.Flush()) - - // Read the object header before closing. - object, err := queue.ReadObject() - require.NoError(t, err) - - queue.close() - - require.True(t, reader.isClosed()) - require.True(t, queue.isClosed()) - - _, err = io.ReadAll(object) - require.Equal(t, os.ErrClosed, err) - }) -} - -func TestObjectContentReader_replaceRefs(t *testing.T) { - ctx := testhelper.Context(t) - - cfg := testcfg.Build(t) - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - originalOID := gittest.WriteBlob(t, cfg, repoPath, []byte("original")) - replacedOID := gittest.WriteBlob(t, cfg, repoPath, []byte("replaced")) - - gittest.WriteRef(t, cfg, repoPath, git.ReferenceName("refs/replace/"+originalOID.String()), replacedOID) - - // Reading the object via our testhelper should result in the object having been replaced. - require.Equal(t, "replaced", text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", originalOID.String()))) - - reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - object, err := reader.Object(ctx, originalOID.Revision()) - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - - // But using our "normal" Git command execution code path, we still want to see the original - // content of the blob. - require.Equal(t, "original", string(contents)) -} diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go index 2d5e21441..595f6d52e 100644 --- a/internal/git/catfile/object_info_reader.go +++ b/internal/git/catfile/object_info_reader.go @@ -64,7 +64,7 @@ restart: // object that cannot exist. This causes Git to write an error and immediately flush // stdout. The only downside is that we need to filter this error here, but that's // acceptable while git-cat-file(1) doesn't yet have any way to natively flush. - if strings.HasPrefix(infoLine, flushCommandHack) { + if strings.HasPrefix(infoLine, flushCommand) { goto restart } @@ -100,11 +100,26 @@ type ObjectInfoReader interface { // Info requests information about the revision pointed to by the given revision. Info(context.Context, git.Revision) (*ObjectInfo, error) - // ObjectQueue returns an ObjectQueue that can be used to batch multiple object info + // InfoQueue returns an ObjectInfoQueue that can be used to batch multiple object info // requests. Using the queue is more efficient than using `Info()` when requesting a bunch - // of objects. The returned function must be executed after use of the ObjectQueue has + // of objects. The returned function must be executed after use of the ObjectInfoQueue has // finished. - ObjectQueue(context.Context) (ObjectQueue, func(), error) + InfoQueue(context.Context) (ObjectInfoQueue, func(), error) +} + +// ObjectInfoQueue allows for requesting and reading object info independently of each other. The +// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the +// object has been requested already. The order of objects returned by ReadInfo is the same as the +// order in which object info has been requested. Users of this interface must call `Flush()` after +// all requests have been queued up such that all requested objects will be readable. +type ObjectInfoQueue interface { + // RequestRevision requests the given revision from git-cat-file(1). + RequestRevision(git.Revision) error + // ReadInfo reads object info which has previously been requested. + ReadInfo() (*ObjectInfo, error) + // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which + // have been requested up to this point. + Flush() error } // objectInfoReader is a reader for Git object information. This reader is implemented via a @@ -173,7 +188,7 @@ func (o *objectInfoReader) isDirty() bool { func (o *objectInfoReader) infoQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) { if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) { - return nil, nil, fmt.Errorf("object queue already in use") + return nil, nil, fmt.Errorf("object info queue already in use") } trace := startTrace(ctx, o.counter, tracedMethod) @@ -192,7 +207,7 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob } defer cleanup() - if err := queue.RequestInfo(revision); err != nil { + if err := queue.RequestRevision(revision); err != nil { return nil, err } @@ -208,7 +223,7 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob return objectInfo, nil } -func (o *objectInfoReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) { +func (o *objectInfoReader) InfoQueue(ctx context.Context) (ObjectInfoQueue, func(), error) { queue, cleanup, err := o.infoQueue(ctx, "catfile.InfoQueue") if err != nil { return nil, nil, err diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go index 06878e763..761b8a81e 100644 --- a/internal/git/catfile/object_info_reader_test.go +++ b/internal/git/catfile/object_info_reader_test.go @@ -243,7 +243,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() @@ -263,7 +263,7 @@ func TestObjectInfoReader_queue(t *testing.T) { blobOID: blobInfo, commitOID: commitInfo, } { - require.NoError(t, queue.RequestInfo(oid.Revision())) + require.NoError(t, queue.RequestRevision(oid.Revision())) require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() @@ -280,8 +280,8 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo(blobOID.Revision())) - require.NoError(t, queue.RequestInfo(commitOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(commitOID.Revision())) require.NoError(t, queue.Flush()) for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} { @@ -314,7 +314,7 @@ func TestObjectInfoReader_queue(t *testing.T) { // We flush once before and once after requesting the object such that we can be // sure that it doesn't impact which objects we can read. require.NoError(t, queue.Flush()) - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() @@ -331,7 +331,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() for i := 0; i < 10; i++ { - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) } require.NoError(t, queue.Flush()) @@ -364,7 +364,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo("does-not-exist")) + require.NoError(t, queue.RequestRevision("does-not-exist")) require.NoError(t, queue.Flush()) _, err = queue.ReadInfo() @@ -379,7 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestInfo("does-not-exist")) + require.NoError(t, queue.RequestRevision("does-not-exist")) require.NoError(t, queue.Flush()) _, err = queue.ReadInfo() @@ -387,7 +387,7 @@ func TestObjectInfoReader_queue(t *testing.T) { // Requesting another object info after the previous one has failed should continue // to work alright. - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.Flush()) info, err := queue.ReadInfo() require.NoError(t, err) @@ -403,7 +403,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() _, _, err = reader.infoQueue(ctx, "trace") - require.Equal(t, errors.New("object queue already in use"), err) + require.Equal(t, errors.New("object info queue already in use"), err) // After calling cleanup we should be able to create an object queue again. cleanup() @@ -424,7 +424,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.False(t, reader.isDirty()) require.False(t, queue.isDirty()) - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) @@ -450,7 +450,7 @@ func TestObjectInfoReader_queue(t *testing.T) { require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(blobOID.Revision())) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(blobOID.Revision())) }) t.Run("closing queue blocks read", func(t *testing.T) { @@ -462,7 +462,7 @@ func TestObjectInfoReader_queue(t *testing.T) { defer cleanup() // Request the object before we close the queue. - require.NoError(t, queue.RequestInfo(blobOID.Revision())) + require.NoError(t, queue.RequestRevision(blobOID.Revision())) require.NoError(t, queue.Flush()) queue.close() diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index 2b934dd33..243e70865 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "fmt" + "io" "sync/atomic" "github.com/prometheus/client_golang/prometheus" @@ -11,13 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git" ) -// ObjectReader returns information about an object referenced by a given revision. +// ObjectReader is a reader for Git objects. type ObjectReader interface { cacheable - // Info requests information about the revision pointed to by the given revision. - Info(context.Context, git.Revision) (*ObjectInfo, error) - // Reader returns a new Object for the given revision. The Object must be fully consumed // before another object is requested. Object(context.Context, git.Revision) (*Object, error) @@ -25,34 +23,28 @@ type ObjectReader interface { // ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests. // Using the queue is more efficient than using `Object()` when requesting a bunch of // objects. The returned function must be executed after use of the ObjectQueue has - // finished. Object Content and information can be requested from the queue but their - // respective ordering must be maintained. + // finished. ObjectQueue(context.Context) (ObjectQueue, func(), error) } // ObjectQueue allows for requesting and reading objects independently of each other. The number of -// RequestObject+RequestInfo and ReadObject+RequestInfo calls must match and their ordering must be -// maintained. ReadObject/ReadInfo must be executed after the object has been requested already. -// The order of objects returned by ReadObject/ReadInfo is the same as the order in +// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has +// been requested already. The order of objects returned by ReadObject is the same as the order in // which objects have been requested. Users of this interface must call `Flush()` after all requests // have been queued up such that all requested objects will be readable. type ObjectQueue interface { - // RequestObject requests the given revision from git-cat-file(1). - RequestObject(git.Revision) error + // RequestRevision requests the given revision from git-cat-file(1). + RequestRevision(git.Revision) error // ReadObject reads an object which has previously been requested. ReadObject() (*Object, error) - // RequestInfo requests the given revision from git-cat-file(1). - RequestInfo(git.Revision) error - // ReadInfo reads object info which has previously been requested. - ReadInfo() (*ObjectInfo, error) // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which // have been requested up to this point. Flush() error } // objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file -// --batch-command` process such that we do not have to spawn a new process for each object we -// are about to read. +// --batch` process such that we do not have to spawn a new process for each object we are about to +// read. type objectReader struct { cmd *command.Command @@ -71,7 +63,7 @@ func newObjectReader( git.SubCmd{ Name: "cat-file", Flags: []git.Option{ - git.Flag{Name: "--batch-command"}, + git.Flag{Name: "--batch"}, git.Flag{Name: "--buffer"}, }, }, @@ -90,10 +82,10 @@ func newObjectReader( cmd: batchCmd, counter: counter, queue: requestQueue{ - objectHash: objectHash, - stdout: bufio.NewReader(batchCmd), - stdin: bufio.NewWriter(batchCmd), - isBatchCommand: true, + objectHash: objectHash, + isObjectQueue: true, + stdout: bufio.NewReader(batchCmd), + stdin: bufio.NewWriter(batchCmd), }, } @@ -134,7 +126,7 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje } defer finish() - if err := queue.RequestObject(revision); err != nil { + if err := queue.RequestRevision(revision); err != nil { return nil, err } @@ -158,25 +150,23 @@ func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), er return queue, finish, nil } -func (o *objectReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) { - queue, cleanup, err := o.objectQueue(ctx, "catfile.Info") - if err != nil { - return nil, err - } - defer cleanup() - - if err := queue.RequestInfo(revision); err != nil { - return nil, err - } +// Object represents data returned by `git cat-file --batch` +type Object struct { + // ObjectInfo represents main information about object + ObjectInfo - if err := queue.Flush(); err != nil { - return nil, err - } + // dataReader is reader which has all the object data. + dataReader io.Reader +} - objectInfo, err := queue.ReadInfo() - if err != nil { - return nil, err - } +func (o *Object) Read(p []byte) (int, error) { + return o.dataReader.Read(p) +} - return objectInfo, nil +// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader +// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are +// implemented by the respective reader or writer. +func (o *Object) WriteTo(w io.Writer) (int64, error) { + // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it. + return io.Copy(w, o.dataReader) } diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go index 619147e2f..32326f07f 100644 --- a/internal/git/catfile/object_reader_test.go +++ b/internal/git/catfile/object_reader_test.go @@ -18,8 +18,6 @@ import ( ) func TestObjectReader_reader(t *testing.T) { - t.Parallel() - ctx := testhelper.Context(t) cfg := testcfg.Build(t) @@ -36,127 +34,47 @@ func TestObjectReader_reader(t *testing.T) { Message: "annotated tag", }) - oiByRevision := make(map[string]*ObjectInfo) - contentByRevision := make(map[string][]byte) - for _, revision := range []string{ - "refs/heads/main", - "refs/heads/main^{tree}", - "refs/heads/main:README", - "refs/tags/v1.1.1", - } { - revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision) - objectID, err := gittest.DefaultObjectHash.FromHex(text.ChompBytes(revParseOutput)) - require.NoError(t, err) - - objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision)) - objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision) - - oiByRevision[revision] = &ObjectInfo{ - Oid: objectID, - Type: objectType, - Size: int64(len(objectContents)), - } - contentByRevision[revision] = objectContents - } - - for _, tc := range []struct { - desc string - revision git.Revision - expectedErr error - expectedInfo *ObjectInfo - expectedContent []byte - }{ - { - desc: "commit by ref", - revision: "refs/heads/main", - expectedInfo: oiByRevision["refs/heads/main"], - expectedContent: contentByRevision["refs/heads/main"], - }, - { - desc: "commit by ID", - revision: oiByRevision["refs/heads/main"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/main"], - expectedContent: contentByRevision["refs/heads/main"], - }, - { - desc: "tree", - revision: oiByRevision["refs/heads/main^{tree}"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/main^{tree}"], - expectedContent: contentByRevision["refs/heads/main^{tree}"], - }, - { - desc: "blob", - revision: oiByRevision["refs/heads/main:README"].Oid.Revision(), - expectedInfo: oiByRevision["refs/heads/main:README"], - expectedContent: contentByRevision["refs/heads/main:README"], - }, - { - desc: "tag", - revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(), - expectedInfo: oiByRevision["refs/tags/v1.1.1"], - expectedContent: contentByRevision["refs/tags/v1.1.1"], - }, - { - desc: "nonexistent ref", - revision: "refs/heads/does-not-exist", - expectedErr: NotFoundError{fmt.Errorf("object not found")}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"}) - - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter) - require.NoError(t, err) + commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", commitID.String()) - require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info"))) - - // Check for object info - info, err := reader.Info(ctx, tc.revision) - require.Equal(t, tc.expectedErr, err) - require.Equal(t, tc.expectedInfo, info) - - // Check for object contents - object, err := reader.Object(ctx, tc.revision) - require.Equal(t, tc.expectedErr, err) - if err == nil { - data, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, tc.expectedContent, data) - } - - expectedRequests := 0 - if tc.expectedErr == nil { - expectedRequests = 1 - } - require.Equal(t, float64(expectedRequests), testutil.ToFloat64(counter.WithLabelValues("info"))) - - // Verify that we do another request no matter whether the previous call - // succeeded or failed. - _, err = reader.Info(ctx, "refs/heads/main") - require.NoError(t, err) + t.Run("read existing object by ref", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) - require.Equal(t, float64(expectedRequests+1), testutil.ToFloat64(counter.WithLabelValues("info"))) - }) - } -} + object, err := reader.Object(ctx, "refs/heads/main") + require.NoError(t, err) -func TestObjectReader_object(t *testing.T) { - t.Parallel() + data, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, commitContents, data) + }) - ctx := testhelper.Context(t) + t.Run("read existing object by object ID", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) - cfg := testcfg.Build(t) - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, + object, err := reader.Object(ctx, commitID.Revision()) + require.NoError(t, err) + + data, err := io.ReadAll(object) + require.NoError(t, err) + require.Equal(t, data, commitContents) }) - commitID := gittest.WriteCommit(t, cfg, repoPath, - gittest.WithBranch("main"), - gittest.WithMessage("commit message"), - gittest.WithTreeEntries(gittest.TreeEntry{Path: "README", Mode: "100644", Content: "something"}), - ) - gittest.WriteTag(t, cfg, repoPath, "v1.1.1", commitID.Revision(), gittest.WriteTagConfig{ - Message: "annotated tag", + t.Run("read missing ref", func(t *testing.T) { + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + _, err = reader.Object(ctx, "refs/heads/does-not-exist") + require.EqualError(t, err, "object not found") + + // Verify that we're still able to read a commit after the previous read has failed. + object, err := reader.Object(ctx, commitID.Revision()) + require.NoError(t, err) + + data, err := io.ReadAll(object) + require.NoError(t, err) + + require.Equal(t, commitContents, data) }) t.Run("read fails when not consuming previous object", func(t *testing.T) { @@ -212,8 +130,6 @@ func TestObjectReader_object(t *testing.T) { } func TestObjectReader_queue(t *testing.T) { - t.Parallel() - ctx := testhelper.Context(t) cfg := testcfg.Build(t) @@ -232,7 +148,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) object, err := queue.ReadObject() @@ -255,7 +171,7 @@ func TestObjectReader_queue(t *testing.T) { foobarBlob: "foobar", barfooBlob: "barfoo", } { - require.NoError(t, queue.RequestObject(blobID.Revision())) + require.NoError(t, queue.RequestRevision(blobID.Revision())) require.NoError(t, queue.Flush()) object, err := queue.ReadObject() @@ -275,8 +191,8 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.RequestObject(barfooBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(barfooBlob.Revision())) require.NoError(t, queue.Flush()) for _, expectedContents := range []string{"foobar", "barfoo"} { @@ -289,53 +205,6 @@ func TestObjectReader_queue(t *testing.T) { } }) - t.Run("request multiple info", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestInfo(foobarBlob.Revision())) - require.NoError(t, queue.RequestInfo(barfooBlob.Revision())) - require.NoError(t, queue.Flush()) - - for _, blob := range []git.ObjectID{foobarBlob, barfooBlob} { - info, err := queue.ReadInfo() - require.NoError(t, err) - require.Equal(t, &ObjectInfo{Oid: git.ObjectID(blob.Revision()), Type: "blob", Size: 6}, info) - } - }) - - t.Run("request info and object together", func(t *testing.T) { - reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) - require.NoError(t, err) - - queue, cleanup, err := reader.objectQueue(ctx, "trace") - require.NoError(t, err) - defer cleanup() - - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) - require.NoError(t, queue.RequestInfo(barfooBlob.Revision())) - require.NoError(t, queue.Flush()) - - object, err := queue.ReadObject() - require.NoError(t, err) - - contents, err := io.ReadAll(object) - require.NoError(t, err) - require.Equal(t, "foobar", string(contents)) - - info, err := queue.ReadInfo() - require.NoError(t, err) - require.Equal(t, &ObjectInfo{ - Oid: git.ObjectID(barfooBlob.Revision()), - Type: "blob", - Size: int64(len("barfoo")), - }, info) - }) - t.Run("read without request", func(t *testing.T) { reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) require.NoError(t, err) @@ -359,7 +228,7 @@ func TestObjectReader_queue(t *testing.T) { // We flush once before and once after requesting the object such that we can be // sure that it doesn't impact which objects we can read. require.NoError(t, queue.Flush()) - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) object, err := queue.ReadObject() @@ -379,7 +248,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() for i := 0; i < 10; i++ { - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) } require.NoError(t, queue.Flush()) @@ -415,7 +284,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject("does-not-exist")) + require.NoError(t, queue.RequestRevision("does-not-exist")) require.NoError(t, queue.Flush()) _, err = queue.ReadObject() @@ -430,7 +299,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject("does-not-exist")) + require.NoError(t, queue.RequestRevision("does-not-exist")) require.NoError(t, queue.Flush()) _, err = queue.ReadObject() @@ -438,7 +307,7 @@ func TestObjectReader_queue(t *testing.T) { // Requesting another object after the previous one has failed should continue to // work alright. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) object, err := queue.ReadObject() require.NoError(t, err) @@ -478,7 +347,7 @@ func TestObjectReader_queue(t *testing.T) { require.False(t, reader.isDirty()) require.False(t, queue.isDirty()) - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) require.True(t, reader.isDirty()) @@ -511,7 +380,7 @@ func TestObjectReader_queue(t *testing.T) { require.True(t, reader.isClosed()) require.True(t, queue.isClosed()) - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision())) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(foobarBlob.Revision())) }) t.Run("closing queue blocks read", func(t *testing.T) { @@ -523,7 +392,7 @@ func TestObjectReader_queue(t *testing.T) { defer cleanup() // Request the object before we close the queue. - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) queue.close() @@ -543,7 +412,7 @@ func TestObjectReader_queue(t *testing.T) { require.NoError(t, err) defer cleanup() - require.NoError(t, queue.RequestObject(foobarBlob.Revision())) + require.NoError(t, queue.RequestRevision(foobarBlob.Revision())) require.NoError(t, queue.Flush()) // Read the object header before closing. @@ -559,3 +428,33 @@ func TestObjectReader_queue(t *testing.T) { require.Equal(t, os.ErrClosed, err) }) } + +func TestObjectReader_replaceRefs(t *testing.T) { + ctx := testhelper.Context(t) + + cfg := testcfg.Build(t) + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + originalOID := gittest.WriteBlob(t, cfg, repoPath, []byte("original")) + replacedOID := gittest.WriteBlob(t, cfg, repoPath, []byte("replaced")) + + gittest.WriteRef(t, cfg, repoPath, git.ReferenceName("refs/replace/"+originalOID.String()), replacedOID) + + // Reading the object via our testhelper should result in the object having been replaced. + require.Equal(t, "replaced", text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", originalOID.String()))) + + reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil) + require.NoError(t, err) + + object, err := reader.Object(ctx, originalOID.Revision()) + require.NoError(t, err) + + contents, err := io.ReadAll(object) + require.NoError(t, err) + + // But using our "normal" Git command execution code path, we still want to see the original + // content of the blob. + require.Equal(t, "original", string(contents)) +} diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go index 08e64df00..232d731b1 100644 --- a/internal/git/catfile/request_queue.go +++ b/internal/git/catfile/request_queue.go @@ -11,22 +11,13 @@ import ( ) const ( - // contentsCommand is the command expected by the `--batch-command` mode of git-cat-file(1) - // for reading an objects contents. - contentsCommand = "contents" - // infoCommand is the command expected by the `--batch-command` mode of git-cat-file(1) - // for reading an objects info. - infoCommand = "info" - // flushCommand is the command expected by the `--batch-command` mode of git-cat-file(1) - // for flushing out to stdout. - flushCommand = "flush" - // flushCommandHack is the command we send to git-cat-file(1) to cause it to flush its stdout. + // flushCommand is the command we send to git-cat-file(1) to cause it to flush its stdout. // Note that this is a hack: git-cat-file(1) doesn't really support flushing, but it will // flush whenever it encounters an object it doesn't know. The flush command we use is thus // chosen such that it cannot ever refer to a valid object: refs may not contain whitespace, // so this command cannot refer to a ref. Adding "FLUSH" is just for the sake of making it // easier to spot what's going on in case we ever mistakenly see this output in the wild. - flushCommandHack = "\tFLUSH\t" + flushCommand = "\tFLUSH\t" ) type requestQueue struct { @@ -57,10 +48,6 @@ type requestQueue struct { // trace is the current tracing span. trace *trace - - // isBatchCommand indicates whether `--batch-command` is used. We use this to determine if - // commands need to be passed to git-cat-file(1). - isBatchCommand bool } // isDirty returns true either if there are outstanding requests for objects or if the current @@ -85,31 +72,14 @@ func (q *requestQueue) close() { atomic.StoreInt32(&q.closed, 1) } -// RequestObject requests the contents for the given revision. A subsequent call has -// to be made to ReadObject to read the contents. -func (q *requestQueue) RequestObject(revision git.Revision) error { - return q.requestRevision(contentsCommand, revision) -} - -// RequestObject requests the info for the given revision. A subsequent call has to -// be made to ReadInfo read the info. -func (q *requestQueue) RequestInfo(revision git.Revision) error { - return q.requestRevision(infoCommand, revision) -} - -func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error { +func (q *requestQueue) RequestRevision(revision git.Revision) error { if q.isClosed() { return fmt.Errorf("cannot request revision: %w", os.ErrClosed) } atomic.AddInt64(&q.outstandingRequests, 1) - input := revision.String() - if q.isBatchCommand { - input = cmd + " " + input - } - - if _, err := q.stdin.WriteString(input); err != nil { + if _, err := q.stdin.WriteString(revision.String()); err != nil { atomic.AddInt64(&q.outstandingRequests, -1) return fmt.Errorf("writing object request: %w", err) } @@ -127,12 +97,7 @@ func (q *requestQueue) Flush() error { return fmt.Errorf("cannot flush: %w", os.ErrClosed) } - cmd := flushCommandHack - if q.isBatchCommand { - cmd = flushCommand - } - - if _, err := q.stdin.WriteString(cmd); err != nil { + if _, err := q.stdin.WriteString(flushCommand); err != nil { return fmt.Errorf("writing flush command: %w", err) } @@ -152,7 +117,7 @@ type readerFunc func([]byte) (int, error) func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) } func (q *requestQueue) ReadObject() (*Object, error) { - if !q.isObjectQueue && !q.isBatchCommand { + if !q.isObjectQueue { panic("object queue used to read object info") } @@ -218,7 +183,7 @@ func (q *requestQueue) ReadObject() (*Object, error) { } func (q *requestQueue) ReadInfo() (*ObjectInfo, error) { - if q.isObjectQueue && !q.isBatchCommand { + if q.isObjectQueue { panic("object queue used to read object info") } diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go index 638ecc2f7..8780e1065 100644 --- a/internal/git/catfile/request_queue_test.go +++ b/internal/git/catfile/request_queue_test.go @@ -22,6 +22,14 @@ func TestRequestQueue_ReadObject(t *testing.T) { oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())) + t.Run("ReadInfo on ReadObject queue", func(t *testing.T) { + _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n") + + require.PanicsWithValue(t, "object queue used to read object info", func() { + _, _ = queue.ReadInfo() + }) + }) + t.Run("read without request", func(t *testing.T) { _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n") _, err := queue.ReadObject() @@ -31,7 +39,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { t.Run("read on closed reader", func(t *testing.T) { reader, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n") - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) require.True(t, queue.isDirty()) reader.close() @@ -47,8 +55,8 @@ func TestRequestQueue_ReadObject(t *testing.T) { `, oid)) // We queue two revisions... - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("foo")) // .. and only unqueue one object. This object isn't read though, ... _, err := queue.ReadObject() @@ -66,7 +74,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "something something" `) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) _, err := queue.ReadObject() require.Equal(t, fmt.Errorf("invalid info line: %q", "something something"), err) @@ -80,7 +88,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { exit 1 `) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) _, err := queue.ReadObject() require.Equal(t, fmt.Errorf("read info line: %w", io.EOF), err) @@ -94,7 +102,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "%s missing" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) _, err := queue.ReadObject() require.Equal(t, NotFoundError{error: fmt.Errorf("object not found")}, err) @@ -110,7 +118,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "1234567890" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) require.True(t, queue.isDirty()) object, err := queue.ReadObject() @@ -138,8 +146,8 @@ func TestRequestQueue_ReadObject(t *testing.T) { echo "0987654321" `, oid, secondOID)) - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("foo")) require.True(t, queue.isDirty()) for _, expectedObject := range []struct { @@ -183,7 +191,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { printf "123" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) require.True(t, queue.isDirty()) object, err := queue.ReadObject() @@ -211,7 +219,7 @@ func TestRequestQueue_ReadObject(t *testing.T) { }) } -func TestRequestQueue_RequestObject(t *testing.T) { +func TestRequestQueue_RequestRevision(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) @@ -231,7 +239,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh") queue.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo")) }) t.Run("requesting revision on closed process", func(t *testing.T) { @@ -239,7 +247,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { process.close() - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo")) + require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo")) }) t.Run("single request", func(t *testing.T) { @@ -249,10 +257,10 @@ func TestRequestQueue_RequestObject(t *testing.T) { echo "${revision}" `, oid)) - require.NoError(t, queue.RequestObject("foo")) + require.NoError(t, queue.RequestRevision("foo")) require.NoError(t, queue.Flush()) - requireRevision(t, queue, "contents foo") + requireRevision(t, queue, "foo") }) t.Run("multiple request", func(t *testing.T) { @@ -264,16 +272,16 @@ func TestRequestQueue_RequestObject(t *testing.T) { done `, oid)) - require.NoError(t, queue.RequestObject("foo")) - require.NoError(t, queue.RequestObject("bar")) - require.NoError(t, queue.RequestObject("baz")) - require.NoError(t, queue.RequestObject("qux")) + require.NoError(t, queue.RequestRevision("foo")) + require.NoError(t, queue.RequestRevision("bar")) + require.NoError(t, queue.RequestRevision("baz")) + require.NoError(t, queue.RequestRevision("qux")) require.NoError(t, queue.Flush()) - requireRevision(t, queue, "contents foo") - requireRevision(t, queue, "contents bar") - requireRevision(t, queue, "contents baz") - requireRevision(t, queue, "contents qux") + requireRevision(t, queue, "foo") + requireRevision(t, queue, "bar") + requireRevision(t, queue, "baz") + requireRevision(t, queue, "qux") }) t.Run("multiple request with intermediate flushing", func(t *testing.T) { @@ -281,7 +289,7 @@ func TestRequestQueue_RequestObject(t *testing.T) { while read revision do read flush - if test "$flush" != "flush" + if test "$flush" != "FLUSH" then echo "expected a flush" exit 1 @@ -298,100 +306,9 @@ func TestRequestQueue_RequestObject(t *testing.T) { "foo", "qux", } { - require.NoError(t, queue.RequestObject(revision)) - require.NoError(t, queue.Flush()) - requireRevision(t, queue, "contents "+revision) - } - }) -} - -func TestRequestQueue_RequestInfo(t *testing.T) { - t.Parallel() - - ctx := testhelper.Context(t) - - oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen())) - expectedInfo := &ObjectInfo{oid, "blob", 955} - - requireRevision := func(t *testing.T, queue *requestQueue) { - info, err := queue.ReadInfo() - require.NoError(t, err) - - require.NoError(t, err) - require.Equal(t, info, expectedInfo) - } - - t.Run("requesting revision on closed queue", func(t *testing.T) { - _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh") - queue.close() - - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo")) - }) - - t.Run("requesting revision on closed process", func(t *testing.T) { - process, queue := newInterceptedQueue(t, ctx, "#!/bin/sh") - - process.close() - - require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo")) - }) - - t.Run("single request", func(t *testing.T) { - _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh - read revision - echo "%s blob 955" - `, oid)) - - require.NoError(t, queue.RequestInfo("foo")) - require.NoError(t, queue.Flush()) - - requireRevision(t, queue) - }) - - t.Run("multiple request", func(t *testing.T) { - _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh - while read revision - do - echo "%s blob 955" - done - `, oid)) - - require.NoError(t, queue.RequestInfo("foo")) - require.NoError(t, queue.RequestInfo("bar")) - require.NoError(t, queue.RequestInfo("baz")) - require.NoError(t, queue.RequestInfo("qux")) - require.NoError(t, queue.Flush()) - - requireRevision(t, queue) - requireRevision(t, queue) - requireRevision(t, queue) - requireRevision(t, queue) - }) - - t.Run("multiple request with intermediate flushing", func(t *testing.T) { - _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh - while read revision - do - read flush - if test "$flush" != "flush" - then - echo "expected a flush" - exit 1 - fi - - echo "%s blob 955" - done - `, oid)) - - for _, revision := range []git.Revision{ - "foo", - "bar", - "foo", - "qux", - } { - require.NoError(t, queue.RequestInfo(revision)) + require.NoError(t, queue.RequestRevision(revision)) require.NoError(t, queue.Flush()) - requireRevision(t, queue) + requireRevision(t, queue, revision) } }) } diff --git a/internal/git/catfile/tag.go b/internal/git/catfile/tag.go index 2636ed1ca..67f38e8fa 100644 --- a/internal/git/catfile/tag.go +++ b/internal/git/catfile/tag.go @@ -14,7 +14,7 @@ import ( // GetTag looks up a commit by tagID using an existing catfile.Batch instance. Note: we pass // in the tagName because the tag name from refs/tags may be different than the name found in the // actual tag object. We want to use the tagName found in refs/tags -func GetTag(ctx context.Context, objectReader ObjectContentReader, tagID git.Revision, tagName string) (*gitalypb.Tag, error) { +func GetTag(ctx context.Context, objectReader ObjectReader, tagID git.Revision, tagName string) (*gitalypb.Tag, error) { object, err := objectReader.Object(ctx, tagID) if err != nil { return nil, err @@ -51,7 +51,7 @@ func ExtractTagSignature(content []byte) ([]byte, []byte) { return nil, content } -func buildAnnotatedTag(ctx context.Context, objectReader ObjectContentReader, object git.Object, name []byte) (*gitalypb.Tag, error) { +func buildAnnotatedTag(ctx context.Context, objectReader ObjectReader, object git.Object, name []byte) (*gitalypb.Tag, error) { tag, tagged, err := newParser().parseTag(object, name) if err != nil { return nil, err @@ -77,7 +77,7 @@ func buildAnnotatedTag(ctx context.Context, objectReader ObjectContentReader, ob // dereferenceTag recursively dereferences annotated tags until it finds a non-tag object. If it is // a commit, then it will parse and return this commit. Otherwise, if the tagged object is not a // commit, it will simply discard the object and not return an error. -func dereferenceTag(ctx context.Context, objectReader ObjectContentReader, oid git.Revision) (*gitalypb.GitCommit, error) { +func dereferenceTag(ctx context.Context, objectReader ObjectReader, oid git.Revision) (*gitalypb.GitCommit, error) { object, err := objectReader.Object(ctx, oid+"^{}") if err != nil { return nil, fmt.Errorf("peeling tag: %w", err) diff --git a/internal/git/catfile/tag_test.go b/internal/git/catfile/tag_test.go index fa9da5763..efc54e20c 100644 --- a/internal/git/catfile/tag_test.go +++ b/internal/git/catfile/tag_test.go @@ -1,7 +1,6 @@ package catfile import ( - "context" "fmt" "strings" "testing" @@ -10,18 +9,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" ) func TestGetTag(t *testing.T) { - t.Parallel() + ctx := testhelper.Context(t) - testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testGetTag) -} - -func testGetTag(t *testing.T, ctx context.Context) { cfg, objectReader, _, repoPath := setupObjectReader(t, ctx) commitID := gittest.WriteCommit(t, cfg, repoPath) diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go index 50a6a39e5..430d24088 100644 --- a/internal/git/catfile/testhelper_test.go +++ b/internal/git/catfile/testhelper_test.go @@ -55,7 +55,7 @@ func (e *repoExecutor) ObjectHash(ctx context.Context) (git.ObjectHash, error) { return gittest.DefaultObjectHash, nil } -func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectContentReader, *gitalypb.Repository, string) { +func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectReader, *gitalypb.Repository, string) { t.Helper() cfg := testcfg.Build(t) diff --git a/internal/git/catfile/tree_entries.go b/internal/git/catfile/tree_entries.go index 91bc4b663..7e27edb81 100644 --- a/internal/git/catfile/tree_entries.go +++ b/internal/git/catfile/tree_entries.go @@ -19,13 +19,13 @@ type revisionPath struct{ revision, path string } // TreeEntryFinder is a struct for searching through a tree with caching. type TreeEntryFinder struct { - objectReader ObjectContentReader + objectReader ObjectReader objectInfoReader ObjectInfoReader treeCache map[revisionPath][]*gitalypb.TreeEntry } // NewTreeEntryFinder initializes a TreeEntryFinder with an empty tree cache. -func NewTreeEntryFinder(objectReader ObjectContentReader, objectInfoReader ObjectInfoReader) *TreeEntryFinder { +func NewTreeEntryFinder(objectReader ObjectReader, objectInfoReader ObjectInfoReader) *TreeEntryFinder { return &TreeEntryFinder{ objectReader: objectReader, objectInfoReader: objectInfoReader, @@ -107,7 +107,7 @@ func extractEntryInfoFromTreeData(treeData io.Reader, commitOid, rootOid, rootPa // TreeEntries returns the entries of a tree in given revision and path. func TreeEntries( ctx context.Context, - objectReader ObjectContentReader, + objectReader ObjectReader, objectInfoReader ObjectInfoReader, revision, path string, ) (_ []*gitalypb.TreeEntry, returnedErr error) { diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go index 55eaf7432..07e29525b 100644 --- a/internal/git/gitpipe/catfile_info.go +++ b/internal/git/gitpipe/catfile_info.go @@ -71,7 +71,7 @@ func CatfileInfo( opt(&cfg) } - queue, queueCleanup, err := objectInfoReader.ObjectQueue(ctx) + queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func CatfileInfo( var i int64 for it.Next() { - if err := queue.RequestInfo(it.ObjectID().Revision()); err != nil { + if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err}) return } diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go index 6cfdcd46e..79d7de32d 100644 --- a/internal/git/gitpipe/catfile_info_test.go +++ b/internal/git/gitpipe/catfile_info_test.go @@ -268,7 +268,7 @@ func TestCatfileInfo(t *testing.T) { // Reusing the queue is not allowed, so we should get an error here. _, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input)) - require.Equal(t, fmt.Errorf("object queue already in use"), err) + require.Equal(t, fmt.Errorf("object info queue already in use"), err) // We now consume all the input of the iterator. require.True(t, it.Next()) diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go index 65c18013a..5fe031579 100644 --- a/internal/git/gitpipe/catfile_object.go +++ b/internal/git/gitpipe/catfile_object.go @@ -36,7 +36,7 @@ type catfileObjectRequest struct { // be fully consumed by the caller. func CatfileObject( ctx context.Context, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, it ObjectIterator, ) (CatfileObjectIterator, error) { queue, queueCleanup, err := objectReader.ObjectQueue(ctx) @@ -72,7 +72,7 @@ func CatfileObject( var i int64 for it.Next() { - if err := queue.RequestObject(it.ObjectID().Revision()); err != nil { + if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil { sendRequest(catfileObjectRequest{err: err}) return } diff --git a/internal/git/log/last_commit.go b/internal/git/log/last_commit.go index eacc030cb..de2d484d0 100644 --- a/internal/git/log/last_commit.go +++ b/internal/git/log/last_commit.go @@ -16,7 +16,7 @@ import ( func LastCommitForPath( ctx context.Context, gitCmdFactory git.CommandFactory, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, repo repository.GitRepo, revision git.Revision, path string, diff --git a/internal/git/log/parser.go b/internal/git/log/parser.go index 61692c701..173bcc37b 100644 --- a/internal/git/log/parser.go +++ b/internal/git/log/parser.go @@ -19,7 +19,7 @@ type Parser struct { scanner *bufio.Scanner currentCommit *gitalypb.GitCommit err error - objectReader catfile.ObjectContentReader + objectReader catfile.ObjectReader } // NewParser returns a new Parser diff --git a/internal/gitaly/service/blob/get_blobs.go b/internal/gitaly/service/blob/get_blobs.go index 634a942d9..51a173a8f 100644 --- a/internal/gitaly/service/blob/get_blobs.go +++ b/internal/gitaly/service/blob/get_blobs.go @@ -23,7 +23,7 @@ var treeEntryToObjectType = map[gitalypb.TreeEntry_EntryType]gitalypb.ObjectType func sendGetBlobsResponse( req *gitalypb.GetBlobsRequest, stream gitalypb.BlobService_GetBlobsServer, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, objectInfoReader catfile.ObjectInfoReader, ) error { ctx := stream.Context() @@ -99,7 +99,7 @@ func sendGetBlobsResponse( func sendBlobTreeEntry( response *gitalypb.GetBlobsResponse, stream gitalypb.BlobService_GetBlobsServer, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, limit int64, ) (returnedErr error) { ctx := stream.Context() diff --git a/internal/gitaly/service/commit/filter_shas_with_signatures.go b/internal/gitaly/service/commit/filter_shas_with_signatures.go index 2576f1757..2b63711f6 100644 --- a/internal/gitaly/service/commit/filter_shas_with_signatures.go +++ b/internal/gitaly/service/commit/filter_shas_with_signatures.go @@ -63,7 +63,7 @@ func (s *server) filterShasWithSignatures(bidi gitalypb.CommitService_FilterShas } } -func filterCommitShasWithSignatures(ctx context.Context, objectReader catfile.ObjectContentReader, shas [][]byte) ([][]byte, error) { +func filterCommitShasWithSignatures(ctx context.Context, objectReader catfile.ObjectReader, shas [][]byte) ([][]byte, error) { var foundShas [][]byte for _, sha := range shas { commit, err := catfile.GetCommit(ctx, objectReader, git.Revision(sha)) diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go index 221ed4336..c22298866 100644 --- a/internal/gitaly/service/commit/find_commits.go +++ b/internal/gitaly/service/commit/find_commits.go @@ -108,11 +108,11 @@ func calculateOffsetManually(req *gitalypb.FindCommitsRequest) bool { // GetCommits wraps a git log command that can be iterated on to get individual commit objects type GetCommits struct { scanner *bufio.Scanner - objectReader catfile.ObjectContentReader + objectReader catfile.ObjectReader } // NewGetCommits returns a new GetCommits object -func NewGetCommits(cmd *command.Command, objectReader catfile.ObjectContentReader, shortStat bool) *GetCommits { +func NewGetCommits(cmd *command.Command, objectReader catfile.ObjectReader, shortStat bool) *GetCommits { getCommits := &GetCommits{ scanner: bufio.NewScanner(cmd), objectReader: objectReader, diff --git a/internal/gitaly/service/commit/tree_entries.go b/internal/gitaly/service/commit/tree_entries.go index c5f70e440..0d1f88c74 100644 --- a/internal/gitaly/service/commit/tree_entries.go +++ b/internal/gitaly/service/commit/tree_entries.go @@ -44,7 +44,7 @@ func validateGetTreeEntriesRequest(in *gitalypb.GetTreeEntriesRequest) error { func populateFlatPath( ctx context.Context, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, objectInfoReader catfile.ObjectInfoReader, entries []*gitalypb.TreeEntry, ) error { @@ -86,7 +86,7 @@ func (s *server) sendTreeEntries( var entries []*gitalypb.TreeEntry var ( - objectReader catfile.ObjectContentReader + objectReader catfile.ObjectReader objectInfoReader catfile.ObjectInfoReader ) diff --git a/internal/gitaly/service/commit/tree_entry.go b/internal/gitaly/service/commit/tree_entry.go index 60071226a..3878c2965 100644 --- a/internal/gitaly/service/commit/tree_entry.go +++ b/internal/gitaly/service/commit/tree_entry.go @@ -15,7 +15,7 @@ import ( func sendTreeEntry( stream gitalypb.CommitService_TreeEntryServer, - objectReader catfile.ObjectContentReader, + objectReader catfile.ObjectReader, objectInfoReader catfile.ObjectInfoReader, revision, path string, limit, maxSize int64, diff --git a/internal/gitaly/service/ref/find_tag.go b/internal/gitaly/service/ref/find_tag.go index 45daba0c4..982f49920 100644 --- a/internal/gitaly/service/ref/find_tag.go +++ b/internal/gitaly/service/ref/find_tag.go @@ -30,7 +30,7 @@ func (s *server) FindTag(ctx context.Context, in *gitalypb.FindTagRequest) (*git } // parseTagLine parses a line of text with the output format %(objectname) %(objecttype) %(refname:lstrip=2) -func parseTagLine(ctx context.Context, objectReader catfile.ObjectContentReader, tagLine string) (*gitalypb.Tag, error) { +func parseTagLine(ctx context.Context, objectReader catfile.ObjectReader, tagLine string) (*gitalypb.Tag, error) { fields := strings.SplitN(tagLine, " ", 3) if len(fields) != 3 { return nil, fmt.Errorf("invalid output from for-each-ref command: %v", tagLine) diff --git a/internal/gitaly/service/ref/util.go b/internal/gitaly/service/ref/util.go index 225d70850..ebeb48a68 100644 --- a/internal/gitaly/service/ref/util.go +++ b/internal/gitaly/service/ref/util.go @@ -58,7 +58,7 @@ func buildLocalBranch(name []byte, target *gitalypb.GitCommit) *gitalypb.FindLoc return response } -func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectContentReader, elements [][]byte) (*gitalypb.FindAllBranchesResponse_Branch, error) { +func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectReader, elements [][]byte) (*gitalypb.FindAllBranchesResponse_Branch, error) { target, err := catfile.GetCommit(ctx, objectReader, git.Revision(elements[1])) if err != nil { return nil, err @@ -70,7 +70,7 @@ func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectCont }, nil } -func buildBranch(ctx context.Context, objectReader catfile.ObjectContentReader, elements [][]byte) (*gitalypb.Branch, error) { +func buildBranch(ctx context.Context, objectReader catfile.ObjectReader, elements [][]byte) (*gitalypb.Branch, error) { target, err := catfile.GetCommit(ctx, objectReader, git.Revision(elements[1])) if err != nil { return nil, err @@ -82,7 +82,7 @@ func buildBranch(ctx context.Context, objectReader catfile.ObjectContentReader, }, nil } -func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender { +func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServer, objectReader catfile.ObjectReader) lines.Sender { return func(refs [][]byte) error { ctx := stream.Context() var response *gitalypb.FindLocalBranchesResponse @@ -109,7 +109,7 @@ func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServ } } -func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender { +func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer, objectReader catfile.ObjectReader) lines.Sender { return func(refs [][]byte) error { var branches []*gitalypb.FindAllBranchesResponse_Branch ctx := stream.Context() @@ -129,7 +129,7 @@ func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer, } } -func newFindAllRemoteBranchesWriter(stream gitalypb.RefService_FindAllRemoteBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender { +func newFindAllRemoteBranchesWriter(stream gitalypb.RefService_FindAllRemoteBranchesServer, objectReader catfile.ObjectReader) lines.Sender { return func(refs [][]byte) error { var branches []*gitalypb.Branch ctx := stream.Context() diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go index a06b32e1a..4c1e436bf 100644 --- a/internal/gitaly/service/repository/apply_gitattributes.go +++ b/internal/gitaly/service/repository/apply_gitattributes.go @@ -23,7 +23,7 @@ import ( const attributesFileMode os.FileMode = 0o644 -func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, objectReader catfile.ObjectContentReader, repoPath string, revision []byte) (returnedErr error) { +func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, objectReader catfile.ObjectReader, repoPath string, revision []byte) (returnedErr error) { infoPath := filepath.Join(repoPath, "info") attributesPath := filepath.Join(infoPath, "attributes") diff --git a/internal/metadata/featureflag/ff_catfile_batch_command.go b/internal/metadata/featureflag/ff_catfile_batch_command.go deleted file mode 100644 index e3022f9cd..000000000 --- a/internal/metadata/featureflag/ff_catfile_batch_command.go +++ /dev/null @@ -1,9 +0,0 @@ -package featureflag - -// CatfileBatchCommand enables the `--batch-command` mode for git-cat-file(1). -var CatfileBatchCommand = NewFeatureFlag( - "catfile_batch_command", - "v15.6.0", - "https://gitlab.com/gitlab-org/gitaly/-/issues/4573", - false, -) diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 9cbf45d9b..1bee6fa47 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -202,9 +202,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV238, rnd.Int()%2 == 0) - // CatfileBatchCommand affects many tests since most of them rely on catfile for content/info - // information about objects. - ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.CatfileBatchCommand, rnd.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) |