Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-11-16 20:58:16 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-11-16 20:58:16 +0300
commit0a8c4c2fd1180046291efb23a068bce778d06244 (patch)
treee580dbe813f1783e858da4f7f0f0372d3cc927c9
parente8342ee1b0aa110e932c3bf5f1e2a2e675f06d7b (diff)
Revert "Merge branch '4420-utilize-git-cat-file-batch-command-mode' into 'master'"
This reverts merge request !4972
-rw-r--r--internal/git/catfile/cache.go45
-rw-r--r--internal/git/catfile/cache_test.go17
-rw-r--r--internal/git/catfile/commit.go6
-rw-r--r--internal/git/catfile/object_content_reader.go157
-rw-r--r--internal/git/catfile/object_content_reader_test.go460
-rw-r--r--internal/git/catfile/object_info_reader.go29
-rw-r--r--internal/git/catfile/object_info_reader_test.go26
-rw-r--r--internal/git/catfile/object_reader.go72
-rw-r--r--internal/git/catfile/object_reader_test.go255
-rw-r--r--internal/git/catfile/request_queue.go49
-rw-r--r--internal/git/catfile/request_queue_test.go151
-rw-r--r--internal/git/catfile/tag.go6
-rw-r--r--internal/git/catfile/tag_test.go8
-rw-r--r--internal/git/catfile/testhelper_test.go2
-rw-r--r--internal/git/catfile/tree_entries.go6
-rw-r--r--internal/git/gitpipe/catfile_info.go4
-rw-r--r--internal/git/gitpipe/catfile_info_test.go2
-rw-r--r--internal/git/gitpipe/catfile_object.go4
-rw-r--r--internal/git/log/last_commit.go2
-rw-r--r--internal/git/log/parser.go2
-rw-r--r--internal/gitaly/service/blob/get_blobs.go4
-rw-r--r--internal/gitaly/service/commit/filter_shas_with_signatures.go2
-rw-r--r--internal/gitaly/service/commit/find_commits.go4
-rw-r--r--internal/gitaly/service/commit/tree_entries.go4
-rw-r--r--internal/gitaly/service/commit/tree_entry.go2
-rw-r--r--internal/gitaly/service/ref/find_tag.go2
-rw-r--r--internal/gitaly/service/ref/util.go10
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes.go2
-rw-r--r--internal/metadata/featureflag/ff_catfile_batch_command.go9
-rw-r--r--internal/testhelper/testhelper.go3
30 files changed, 232 insertions, 1113 deletions
diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go
index cde7957f9..3fb00a588 100644
--- a/internal/git/catfile/cache.go
+++ b/internal/git/catfile/cache.go
@@ -14,7 +14,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/metadata"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/labkit/correlation"
)
@@ -35,7 +34,7 @@ const (
type Cache interface {
// ObjectReader either creates a new object reader or returns a cached one for the given
// repository.
- ObjectReader(context.Context, git.RepositoryExecutor) (ObjectContentReader, func(), error)
+ ObjectReader(context.Context, git.RepositoryExecutor) (ObjectReader, func(), error)
// ObjectInfoReader either creates a new object info reader or returns a cached one for the
// given repository.
ObjectInfoReader(context.Context, git.RepositoryExecutor) (ObjectInfoReader, func(), error)
@@ -171,27 +170,17 @@ func (c *ProcessCache) Stop() {
}
// ObjectReader creates a new ObjectReader process for the given repository.
-func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectContentReader, func(), error) {
- var cached cacheable
- var err error
- var cancel func()
-
- if featureflag.CatfileBatchCommand.IsEnabled(ctx) {
- cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
- return newObjectReader(ctx, repo, c.catfileLookupCounter)
- }, "catfile.ObjectReader")
- } else {
- cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
- return newObjectContentReader(ctx, repo, c.catfileLookupCounter)
- }, "catfile.ObjectContentReader")
- }
+func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectReader, func(), error) {
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) {
+ return newObjectReader(ctx, repo, c.catfileLookupCounter)
+ }, "catfile.ObjectReader")
if err != nil {
return nil, nil, err
}
- objectReader, ok := cached.(ObjectContentReader)
+ objectReader, ok := cacheable.(ObjectReader)
if !ok {
- return nil, nil, fmt.Errorf("expected object reader, got %T", cached)
+ return nil, nil, fmt.Errorf("expected object reader, got %T", cacheable)
}
return objectReader, cancel, nil
@@ -199,26 +188,16 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec
// ObjectInfoReader creates a new ObjectInfoReader process for the given repository.
func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.RepositoryExecutor) (ObjectInfoReader, func(), error) {
- var cached cacheable
- var err error
- var cancel func()
-
- if featureflag.CatfileBatchCommand.IsEnabled(ctx) {
- cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
- return newObjectReader(ctx, repo, c.catfileLookupCounter)
- }, "catfile.ObjectReader")
- } else {
- cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
- return newObjectInfoReader(ctx, repo, c.catfileLookupCounter)
- }, "catfile.ObjectInfoReader")
- }
+ cacheable, cancel, err := c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) {
+ return newObjectInfoReader(ctx, repo, c.catfileLookupCounter)
+ }, "catfile.ObjectInfoReader")
if err != nil {
return nil, nil, err
}
- objectInfoReader, ok := cached.(ObjectInfoReader)
+ objectInfoReader, ok := cacheable.(ObjectInfoReader)
if !ok {
- return nil, nil, fmt.Errorf("expected object info reader, got %T", cached)
+ return nil, nil, fmt.Errorf("expected object info reader, got %T", cacheable)
}
return objectInfoReader, cancel, nil
diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go
index 1a20f37c5..b6c4f544d 100644
--- a/internal/git/catfile/cache_test.go
+++ b/internal/git/catfile/cache_test.go
@@ -13,7 +13,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/labkit/correlation"
@@ -204,12 +203,7 @@ func TestCache_autoExpiry(t *testing.T) {
}
func TestCache_ObjectReader(t *testing.T) {
- t.Parallel()
-
- testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testCacheObjectReader)
-}
-
-func testCacheObjectReader(t *testing.T, ctx context.Context) {
+ ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
@@ -309,12 +303,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) {
}
func TestCache_ObjectInfoReader(t *testing.T) {
- t.Parallel()
-
- testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testCacheObjectInfoReader)
-}
-
-func testCacheObjectInfoReader(t *testing.T, ctx context.Context) {
+ ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
@@ -402,7 +391,7 @@ func mustCreateCacheable(t *testing.T, cfg config.Cfg, repo repository.GitRepo)
ctx, cancel := context.WithCancel(testhelper.Context(t))
- batch, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repo), nil)
+ batch, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repo), nil)
require.NoError(t, err)
return batch, cancel
diff --git a/internal/git/catfile/commit.go b/internal/git/catfile/commit.go
index 36aa8ca0b..93799e3dc 100644
--- a/internal/git/catfile/commit.go
+++ b/internal/git/catfile/commit.go
@@ -14,7 +14,7 @@ import (
)
// GetCommit looks up a commit by revision using an existing Batch instance.
-func GetCommit(ctx context.Context, objectReader ObjectContentReader, revision git.Revision) (*gitalypb.GitCommit, error) {
+func GetCommit(ctx context.Context, objectReader ObjectReader, revision git.Revision) (*gitalypb.GitCommit, error) {
object, err := objectReader.Object(ctx, revision+"^{commit}")
if err != nil {
return nil, err
@@ -29,7 +29,7 @@ func GetCommitWithTrailers(
ctx context.Context,
gitCmdFactory git.CommandFactory,
repo repository.GitRepo,
- objectReader ObjectContentReader,
+ objectReader ObjectReader,
revision git.Revision,
) (*gitalypb.GitCommit, error) {
commit, err := GetCommit(ctx, objectReader, revision)
@@ -67,7 +67,7 @@ func GetCommitWithTrailers(
}
// GetCommitMessage looks up a commit message and returns it in its entirety.
-func GetCommitMessage(ctx context.Context, objectReader ObjectContentReader, repo repository.GitRepo, revision git.Revision) ([]byte, error) {
+func GetCommitMessage(ctx context.Context, objectReader ObjectReader, repo repository.GitRepo, revision git.Revision) ([]byte, error) {
obj, err := objectReader.Object(ctx, revision+"^{commit}")
if err != nil {
return nil, err
diff --git a/internal/git/catfile/object_content_reader.go b/internal/git/catfile/object_content_reader.go
deleted file mode 100644
index 919e3eff0..000000000
--- a/internal/git/catfile/object_content_reader.go
+++ /dev/null
@@ -1,157 +0,0 @@
-package catfile
-
-import (
- "bufio"
- "context"
- "fmt"
- "io"
- "sync/atomic"
-
- "github.com/prometheus/client_golang/prometheus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/command"
- "gitlab.com/gitlab-org/gitaly/v15/internal/git"
-)
-
-// ObjectContentReader is a reader for Git objects.
-type ObjectContentReader interface {
- cacheable
-
- // Reader returns a new Object for the given revision. The Object must be fully consumed
- // before another object is requested.
- Object(context.Context, git.Revision) (*Object, error)
-
- // ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests.
- // Using the queue is more efficient than using `Object()` when requesting a bunch of
- // objects. The returned function must be executed after use of the ObjectQueue has
- // finished.
- ObjectQueue(context.Context) (ObjectQueue, func(), error)
-}
-
-// objectContentReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
-// --batch` process such that we do not have to spawn a new process for each object we are about to
-// read.
-type objectContentReader struct {
- cmd *command.Command
-
- counter *prometheus.CounterVec
-
- queue requestQueue
- queueInUse int32
-}
-
-func newObjectContentReader(
- ctx context.Context,
- repo git.RepositoryExecutor,
- counter *prometheus.CounterVec,
-) (*objectContentReader, error) {
- batchCmd, err := repo.Exec(ctx,
- git.SubCmd{
- Name: "cat-file",
- Flags: []git.Option{
- git.Flag{Name: "--batch"},
- git.Flag{Name: "--buffer"},
- },
- },
- git.WithSetupStdin(),
- )
- if err != nil {
- return nil, err
- }
-
- objectHash, err := repo.ObjectHash(ctx)
- if err != nil {
- return nil, fmt.Errorf("detecting object hash: %w", err)
- }
-
- objectReader := &objectContentReader{
- cmd: batchCmd,
- counter: counter,
- queue: requestQueue{
- objectHash: objectHash,
- isObjectQueue: true,
- stdout: bufio.NewReader(batchCmd),
- stdin: bufio.NewWriter(batchCmd),
- },
- }
-
- return objectReader, nil
-}
-
-func (o *objectContentReader) close() {
- o.queue.close()
- _ = o.cmd.Wait()
-}
-
-func (o *objectContentReader) isClosed() bool {
- return o.queue.isClosed()
-}
-
-func (o *objectContentReader) isDirty() bool {
- return o.queue.isDirty()
-}
-
-func (o *objectContentReader) objectQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) {
- if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
- return nil, nil, fmt.Errorf("object queue already in use")
- }
-
- trace := startTrace(ctx, o.counter, tracedMethod)
- o.queue.trace = trace
-
- return &o.queue, func() {
- atomic.StoreInt32(&o.queueInUse, 0)
- trace.finish()
- }, nil
-}
-
-func (o *objectContentReader) Object(ctx context.Context, revision git.Revision) (*Object, error) {
- queue, finish, err := o.objectQueue(ctx, "catfile.Object")
- if err != nil {
- return nil, err
- }
- defer finish()
-
- if err := queue.RequestObject(revision); err != nil {
- return nil, err
- }
-
- if err := queue.Flush(); err != nil {
- return nil, err
- }
-
- object, err := queue.ReadObject()
- if err != nil {
- return nil, err
- }
-
- return object, nil
-}
-
-func (o *objectContentReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) {
- queue, finish, err := o.objectQueue(ctx, "catfile.ObjectQueue")
- if err != nil {
- return nil, nil, err
- }
- return queue, finish, nil
-}
-
-// Object represents data returned by `git cat-file --batch`
-type Object struct {
- // ObjectInfo represents main information about object
- ObjectInfo
-
- // dataReader is reader which has all the object data.
- dataReader io.Reader
-}
-
-func (o *Object) Read(p []byte) (int, error) {
- return o.dataReader.Read(p)
-}
-
-// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader
-// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
-// implemented by the respective reader or writer.
-func (o *Object) WriteTo(w io.Writer) (int64, error) {
- // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it.
- return io.Copy(w, o.dataReader)
-}
diff --git a/internal/git/catfile/object_content_reader_test.go b/internal/git/catfile/object_content_reader_test.go
deleted file mode 100644
index 92e68c9b8..000000000
--- a/internal/git/catfile/object_content_reader_test.go
+++ /dev/null
@@ -1,460 +0,0 @@
-package catfile
-
-import (
- "errors"
- "fmt"
- "io"
- "os"
- "testing"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/testutil"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/internal/git"
- "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
- "gitlab.com/gitlab-org/gitaly/v15/internal/helper/text"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
-)
-
-func TestObjectContentReader_reader(t *testing.T) {
- ctx := testhelper.Context(t)
-
- cfg := testcfg.Build(t)
- repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- SkipCreationViaService: true,
- })
-
- commitID := gittest.WriteCommit(t, cfg, repoPath,
- gittest.WithBranch("main"),
- gittest.WithMessage("commit message"),
- gittest.WithTreeEntries(gittest.TreeEntry{Path: "README", Mode: "100644", Content: "something"}),
- )
- gittest.WriteTag(t, cfg, repoPath, "v1.1.1", commitID.Revision(), gittest.WriteTagConfig{
- Message: "annotated tag",
- })
-
- commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", commitID.String())
-
- t.Run("read existing object by ref", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.Object(ctx, "refs/heads/main")
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, commitContents, data)
- })
-
- t.Run("read existing object by object ID", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.Object(ctx, commitID.Revision())
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, data, commitContents)
- })
-
- t.Run("read missing ref", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, err = reader.Object(ctx, "refs/heads/does-not-exist")
- require.EqualError(t, err, "object not found")
-
- // Verify that we're still able to read a commit after the previous read has failed.
- object, err := reader.Object(ctx, commitID.Revision())
- require.NoError(t, err)
-
- data, err := io.ReadAll(object)
- require.NoError(t, err)
-
- require.Equal(t, commitContents, data)
- })
-
- t.Run("read fails when not consuming previous object", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, err = reader.Object(ctx, commitID.Revision())
- require.NoError(t, err)
-
- // We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, "current object has not been fully read")
- })
-
- t.Run("read fails when partially consuming previous object", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.Object(ctx, commitID.Revision())
- require.NoError(t, err)
-
- _, err = io.CopyN(io.Discard, object, 100)
- require.NoError(t, err)
-
- // We haven't yet consumed the previous object, so this must now fail.
- _, err = reader.Object(ctx, commitID.Revision())
- require.EqualError(t, err, "current object has not been fully read")
- })
-
- t.Run("read increments Prometheus counter", func(t *testing.T) {
- counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
-
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
- require.NoError(t, err)
-
- for objectType, revision := range map[string]git.Revision{
- "commit": "refs/heads/main",
- "tree": "refs/heads/main^{tree}",
- "blob": "refs/heads/main:README",
- "tag": "refs/tags/v1.1.1",
- } {
- require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues(objectType)))
-
- object, err := reader.Object(ctx, revision)
- require.NoError(t, err)
-
- require.Equal(t, float64(1), testutil.ToFloat64(counter.WithLabelValues(objectType)))
-
- _, err = io.Copy(io.Discard, object)
- require.NoError(t, err)
- }
- })
-}
-
-func TestObjectContentReader_queue(t *testing.T) {
- ctx := testhelper.Context(t)
-
- cfg := testcfg.Build(t)
- repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- SkipCreationViaService: true,
- })
-
- foobarBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
- barfooBlob := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
-
- t.Run("read single object", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, "foobar", string(contents))
- })
-
- t.Run("read multiple objects", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- for blobID, blobContents := range map[git.ObjectID]string{
- foobarBlob: "foobar",
- barfooBlob: "barfoo",
- } {
- require.NoError(t, queue.RequestObject(blobID.Revision()))
- require.NoError(t, queue.Flush())
-
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, blobContents, string(contents))
- }
- })
-
- t.Run("request multiple objects", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.RequestObject(barfooBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- for _, expectedContents := range []string{"foobar", "barfoo"} {
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, expectedContents, string(contents))
- }
- })
-
- t.Run("read without request", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- _, err = queue.ReadObject()
- require.Equal(t, errors.New("no outstanding request"), err)
- })
-
- t.Run("flush with single request", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- // We flush once before and once after requesting the object such that we can be
- // sure that it doesn't impact which objects we can read.
- require.NoError(t, queue.Flush())
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, "foobar", string(contents))
- })
-
- t.Run("flush with multiple requests", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- for i := 0; i < 10; i++ {
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- }
- require.NoError(t, queue.Flush())
-
- for i := 0; i < 10; i++ {
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, "foobar", string(contents))
- }
- })
-
- t.Run("flush without request", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.Flush())
-
- _, err = queue.ReadObject()
- require.Equal(t, errors.New("no outstanding request"), err)
- })
-
- t.Run("request invalid object", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject("does-not-exist"))
- require.NoError(t, queue.Flush())
-
- _, err = queue.ReadObject()
- require.Equal(t, NotFoundError{errors.New("object not found")}, err)
- })
-
- t.Run("can continue reading after NotFoundError", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject("does-not-exist"))
- require.NoError(t, queue.Flush())
-
- _, err = queue.ReadObject()
- require.Equal(t, NotFoundError{errors.New("object not found")}, err)
-
- // Requesting another object after the previous one has failed should continue to
- // work alright.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, "foobar", string(contents))
- })
-
- t.Run("requesting multiple queues fails", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- _, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- _, _, err = reader.objectQueue(ctx, "trace")
- require.Equal(t, errors.New("object queue already in use"), err)
-
- // After calling cleanup we should be able to create an object queue again.
- cleanup()
-
- _, cleanup, err = reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
- })
-
- t.Run("requesting object dirties reader", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.False(t, reader.isDirty())
- require.False(t, queue.isDirty())
-
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- require.True(t, reader.isDirty())
- require.True(t, queue.isDirty())
-
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- // The object has not been consumed yet, so the reader must still be dirty.
- require.True(t, reader.isDirty())
- require.True(t, queue.isDirty())
-
- _, err = io.ReadAll(object)
- require.NoError(t, err)
-
- require.False(t, reader.isDirty())
- require.False(t, queue.isDirty())
- })
-
- t.Run("closing queue blocks request", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- queue.close()
-
- require.True(t, reader.isClosed())
- require.True(t, queue.isClosed())
-
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision()))
- })
-
- t.Run("closing queue blocks read", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- // Request the object before we close the queue.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- queue.close()
-
- require.True(t, reader.isClosed())
- require.True(t, queue.isClosed())
-
- _, err = queue.ReadObject()
- require.Equal(t, fmt.Errorf("cannot read object info: %w", os.ErrClosed), err)
- })
-
- t.Run("closing queue blocks consuming", func(t *testing.T) {
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- // Read the object header before closing.
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- queue.close()
-
- require.True(t, reader.isClosed())
- require.True(t, queue.isClosed())
-
- _, err = io.ReadAll(object)
- require.Equal(t, os.ErrClosed, err)
- })
-}
-
-func TestObjectContentReader_replaceRefs(t *testing.T) {
- ctx := testhelper.Context(t)
-
- cfg := testcfg.Build(t)
- repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- SkipCreationViaService: true,
- })
-
- originalOID := gittest.WriteBlob(t, cfg, repoPath, []byte("original"))
- replacedOID := gittest.WriteBlob(t, cfg, repoPath, []byte("replaced"))
-
- gittest.WriteRef(t, cfg, repoPath, git.ReferenceName("refs/replace/"+originalOID.String()), replacedOID)
-
- // Reading the object via our testhelper should result in the object having been replaced.
- require.Equal(t, "replaced", text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", originalOID.String())))
-
- reader, err := newObjectContentReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- object, err := reader.Object(ctx, originalOID.Revision())
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
-
- // But using our "normal" Git command execution code path, we still want to see the original
- // content of the blob.
- require.Equal(t, "original", string(contents))
-}
diff --git a/internal/git/catfile/object_info_reader.go b/internal/git/catfile/object_info_reader.go
index 2d5e21441..595f6d52e 100644
--- a/internal/git/catfile/object_info_reader.go
+++ b/internal/git/catfile/object_info_reader.go
@@ -64,7 +64,7 @@ restart:
// object that cannot exist. This causes Git to write an error and immediately flush
// stdout. The only downside is that we need to filter this error here, but that's
// acceptable while git-cat-file(1) doesn't yet have any way to natively flush.
- if strings.HasPrefix(infoLine, flushCommandHack) {
+ if strings.HasPrefix(infoLine, flushCommand) {
goto restart
}
@@ -100,11 +100,26 @@ type ObjectInfoReader interface {
// Info requests information about the revision pointed to by the given revision.
Info(context.Context, git.Revision) (*ObjectInfo, error)
- // ObjectQueue returns an ObjectQueue that can be used to batch multiple object info
+ // InfoQueue returns an ObjectInfoQueue that can be used to batch multiple object info
// requests. Using the queue is more efficient than using `Info()` when requesting a bunch
- // of objects. The returned function must be executed after use of the ObjectQueue has
+ // of objects. The returned function must be executed after use of the ObjectInfoQueue has
// finished.
- ObjectQueue(context.Context) (ObjectQueue, func(), error)
+ InfoQueue(context.Context) (ObjectInfoQueue, func(), error)
+}
+
+// ObjectInfoQueue allows for requesting and reading object info independently of each other. The
+// number of RequestInfo and ReadInfo calls must match. ReadObject must be executed after the
+// object has been requested already. The order of objects returned by ReadInfo is the same as the
+// order in which object info has been requested. Users of this interface must call `Flush()` after
+// all requests have been queued up such that all requested objects will be readable.
+type ObjectInfoQueue interface {
+ // RequestRevision requests the given revision from git-cat-file(1).
+ RequestRevision(git.Revision) error
+ // ReadInfo reads object info which has previously been requested.
+ ReadInfo() (*ObjectInfo, error)
+ // Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
+ // have been requested up to this point.
+ Flush() error
}
// objectInfoReader is a reader for Git object information. This reader is implemented via a
@@ -173,7 +188,7 @@ func (o *objectInfoReader) isDirty() bool {
func (o *objectInfoReader) infoQueue(ctx context.Context, tracedMethod string) (*requestQueue, func(), error) {
if !atomic.CompareAndSwapInt32(&o.queueInUse, 0, 1) {
- return nil, nil, fmt.Errorf("object queue already in use")
+ return nil, nil, fmt.Errorf("object info queue already in use")
}
trace := startTrace(ctx, o.counter, tracedMethod)
@@ -192,7 +207,7 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob
}
defer cleanup()
- if err := queue.RequestInfo(revision); err != nil {
+ if err := queue.RequestRevision(revision); err != nil {
return nil, err
}
@@ -208,7 +223,7 @@ func (o *objectInfoReader) Info(ctx context.Context, revision git.Revision) (*Ob
return objectInfo, nil
}
-func (o *objectInfoReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), error) {
+func (o *objectInfoReader) InfoQueue(ctx context.Context) (ObjectInfoQueue, func(), error) {
queue, cleanup, err := o.infoQueue(ctx, "catfile.InfoQueue")
if err != nil {
return nil, nil, err
diff --git a/internal/git/catfile/object_info_reader_test.go b/internal/git/catfile/object_info_reader_test.go
index 06878e763..761b8a81e 100644
--- a/internal/git/catfile/object_info_reader_test.go
+++ b/internal/git/catfile/object_info_reader_test.go
@@ -243,7 +243,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
@@ -263,7 +263,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
blobOID: blobInfo,
commitOID: commitInfo,
} {
- require.NoError(t, queue.RequestInfo(oid.Revision()))
+ require.NoError(t, queue.RequestRevision(oid.Revision()))
require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
@@ -280,8 +280,8 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
- require.NoError(t, queue.RequestInfo(commitOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(commitOID.Revision()))
require.NoError(t, queue.Flush())
for _, expectedInfo := range []ObjectInfo{blobInfo, commitInfo} {
@@ -314,7 +314,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
// We flush once before and once after requesting the object such that we can be
// sure that it doesn't impact which objects we can read.
require.NoError(t, queue.Flush())
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
@@ -331,7 +331,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
for i := 0; i < 10; i++ {
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
}
require.NoError(t, queue.Flush())
@@ -364,7 +364,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo("does-not-exist"))
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
require.NoError(t, queue.Flush())
_, err = queue.ReadInfo()
@@ -379,7 +379,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestInfo("does-not-exist"))
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
require.NoError(t, queue.Flush())
_, err = queue.ReadInfo()
@@ -387,7 +387,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
// Requesting another object info after the previous one has failed should continue
// to work alright.
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.Flush())
info, err := queue.ReadInfo()
require.NoError(t, err)
@@ -403,7 +403,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
_, _, err = reader.infoQueue(ctx, "trace")
- require.Equal(t, errors.New("object queue already in use"), err)
+ require.Equal(t, errors.New("object info queue already in use"), err)
// After calling cleanup we should be able to create an object queue again.
cleanup()
@@ -424,7 +424,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
@@ -450,7 +450,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo(blobOID.Revision()))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(blobOID.Revision()))
})
t.Run("closing queue blocks read", func(t *testing.T) {
@@ -462,7 +462,7 @@ func TestObjectInfoReader_queue(t *testing.T) {
defer cleanup()
// Request the object before we close the queue.
- require.NoError(t, queue.RequestInfo(blobOID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobOID.Revision()))
require.NoError(t, queue.Flush())
queue.close()
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index 2b934dd33..243e70865 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"fmt"
+ "io"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
@@ -11,13 +12,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
)
-// ObjectReader returns information about an object referenced by a given revision.
+// ObjectReader is a reader for Git objects.
type ObjectReader interface {
cacheable
- // Info requests information about the revision pointed to by the given revision.
- Info(context.Context, git.Revision) (*ObjectInfo, error)
-
// Reader returns a new Object for the given revision. The Object must be fully consumed
// before another object is requested.
Object(context.Context, git.Revision) (*Object, error)
@@ -25,34 +23,28 @@ type ObjectReader interface {
// ObjectQueue returns an ObjectQueue that can be used to batch multiple object requests.
// Using the queue is more efficient than using `Object()` when requesting a bunch of
// objects. The returned function must be executed after use of the ObjectQueue has
- // finished. Object Content and information can be requested from the queue but their
- // respective ordering must be maintained.
+ // finished.
ObjectQueue(context.Context) (ObjectQueue, func(), error)
}
// ObjectQueue allows for requesting and reading objects independently of each other. The number of
-// RequestObject+RequestInfo and ReadObject+RequestInfo calls must match and their ordering must be
-// maintained. ReadObject/ReadInfo must be executed after the object has been requested already.
-// The order of objects returned by ReadObject/ReadInfo is the same as the order in
+// RequestObject and ReadObject calls must match. ReadObject must be executed after the object has
+// been requested already. The order of objects returned by ReadObject is the same as the order in
// which objects have been requested. Users of this interface must call `Flush()` after all requests
// have been queued up such that all requested objects will be readable.
type ObjectQueue interface {
- // RequestObject requests the given revision from git-cat-file(1).
- RequestObject(git.Revision) error
+ // RequestRevision requests the given revision from git-cat-file(1).
+ RequestRevision(git.Revision) error
// ReadObject reads an object which has previously been requested.
ReadObject() (*Object, error)
- // RequestInfo requests the given revision from git-cat-file(1).
- RequestInfo(git.Revision) error
- // ReadInfo reads object info which has previously been requested.
- ReadInfo() (*ObjectInfo, error)
// Flush flushes all queued requests and asks git-cat-file(1) to print all objects which
// have been requested up to this point.
Flush() error
}
// objectReader is a reader for Git objects. Reading is implemented via a long-lived `git cat-file
-// --batch-command` process such that we do not have to spawn a new process for each object we
-// are about to read.
+// --batch` process such that we do not have to spawn a new process for each object we are about to
+// read.
type objectReader struct {
cmd *command.Command
@@ -71,7 +63,7 @@ func newObjectReader(
git.SubCmd{
Name: "cat-file",
Flags: []git.Option{
- git.Flag{Name: "--batch-command"},
+ git.Flag{Name: "--batch"},
git.Flag{Name: "--buffer"},
},
},
@@ -90,10 +82,10 @@ func newObjectReader(
cmd: batchCmd,
counter: counter,
queue: requestQueue{
- objectHash: objectHash,
- stdout: bufio.NewReader(batchCmd),
- stdin: bufio.NewWriter(batchCmd),
- isBatchCommand: true,
+ objectHash: objectHash,
+ isObjectQueue: true,
+ stdout: bufio.NewReader(batchCmd),
+ stdin: bufio.NewWriter(batchCmd),
},
}
@@ -134,7 +126,7 @@ func (o *objectReader) Object(ctx context.Context, revision git.Revision) (*Obje
}
defer finish()
- if err := queue.RequestObject(revision); err != nil {
+ if err := queue.RequestRevision(revision); err != nil {
return nil, err
}
@@ -158,25 +150,23 @@ func (o *objectReader) ObjectQueue(ctx context.Context) (ObjectQueue, func(), er
return queue, finish, nil
}
-func (o *objectReader) Info(ctx context.Context, revision git.Revision) (*ObjectInfo, error) {
- queue, cleanup, err := o.objectQueue(ctx, "catfile.Info")
- if err != nil {
- return nil, err
- }
- defer cleanup()
-
- if err := queue.RequestInfo(revision); err != nil {
- return nil, err
- }
+// Object represents data returned by `git cat-file --batch`
+type Object struct {
+ // ObjectInfo represents main information about object
+ ObjectInfo
- if err := queue.Flush(); err != nil {
- return nil, err
- }
+ // dataReader is reader which has all the object data.
+ dataReader io.Reader
+}
- objectInfo, err := queue.ReadInfo()
- if err != nil {
- return nil, err
- }
+func (o *Object) Read(p []byte) (int, error) {
+ return o.dataReader.Read(p)
+}
- return objectInfo, nil
+// WriteTo implements the io.WriterTo interface. It defers the write to the embedded object reader
+// via `io.Copy()`, which in turn will use `WriteTo()` or `ReadFrom()` in case these interfaces are
+// implemented by the respective reader or writer.
+func (o *Object) WriteTo(w io.Writer) (int64, error) {
+ // `io.Copy()` will make use of `ReadFrom()` in case the writer implements it.
+ return io.Copy(w, o.dataReader)
}
diff --git a/internal/git/catfile/object_reader_test.go b/internal/git/catfile/object_reader_test.go
index 619147e2f..32326f07f 100644
--- a/internal/git/catfile/object_reader_test.go
+++ b/internal/git/catfile/object_reader_test.go
@@ -18,8 +18,6 @@ import (
)
func TestObjectReader_reader(t *testing.T) {
- t.Parallel()
-
ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
@@ -36,127 +34,47 @@ func TestObjectReader_reader(t *testing.T) {
Message: "annotated tag",
})
- oiByRevision := make(map[string]*ObjectInfo)
- contentByRevision := make(map[string][]byte)
- for _, revision := range []string{
- "refs/heads/main",
- "refs/heads/main^{tree}",
- "refs/heads/main:README",
- "refs/tags/v1.1.1",
- } {
- revParseOutput := gittest.Exec(t, cfg, "-C", repoPath, "rev-parse", revision)
- objectID, err := gittest.DefaultObjectHash.FromHex(text.ChompBytes(revParseOutput))
- require.NoError(t, err)
-
- objectType := text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-t", revision))
- objectContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", objectType, revision)
-
- oiByRevision[revision] = &ObjectInfo{
- Oid: objectID,
- Type: objectType,
- Size: int64(len(objectContents)),
- }
- contentByRevision[revision] = objectContents
- }
-
- for _, tc := range []struct {
- desc string
- revision git.Revision
- expectedErr error
- expectedInfo *ObjectInfo
- expectedContent []byte
- }{
- {
- desc: "commit by ref",
- revision: "refs/heads/main",
- expectedInfo: oiByRevision["refs/heads/main"],
- expectedContent: contentByRevision["refs/heads/main"],
- },
- {
- desc: "commit by ID",
- revision: oiByRevision["refs/heads/main"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/main"],
- expectedContent: contentByRevision["refs/heads/main"],
- },
- {
- desc: "tree",
- revision: oiByRevision["refs/heads/main^{tree}"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/main^{tree}"],
- expectedContent: contentByRevision["refs/heads/main^{tree}"],
- },
- {
- desc: "blob",
- revision: oiByRevision["refs/heads/main:README"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/heads/main:README"],
- expectedContent: contentByRevision["refs/heads/main:README"],
- },
- {
- desc: "tag",
- revision: oiByRevision["refs/tags/v1.1.1"].Oid.Revision(),
- expectedInfo: oiByRevision["refs/tags/v1.1.1"],
- expectedContent: contentByRevision["refs/tags/v1.1.1"],
- },
- {
- desc: "nonexistent ref",
- revision: "refs/heads/does-not-exist",
- expectedErr: NotFoundError{fmt.Errorf("object not found")},
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- counter := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"type"})
-
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), counter)
- require.NoError(t, err)
+ commitContents := gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", commitID.String())
- require.Equal(t, float64(0), testutil.ToFloat64(counter.WithLabelValues("info")))
-
- // Check for object info
- info, err := reader.Info(ctx, tc.revision)
- require.Equal(t, tc.expectedErr, err)
- require.Equal(t, tc.expectedInfo, info)
-
- // Check for object contents
- object, err := reader.Object(ctx, tc.revision)
- require.Equal(t, tc.expectedErr, err)
- if err == nil {
- data, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, tc.expectedContent, data)
- }
-
- expectedRequests := 0
- if tc.expectedErr == nil {
- expectedRequests = 1
- }
- require.Equal(t, float64(expectedRequests), testutil.ToFloat64(counter.WithLabelValues("info")))
-
- // Verify that we do another request no matter whether the previous call
- // succeeded or failed.
- _, err = reader.Info(ctx, "refs/heads/main")
- require.NoError(t, err)
+ t.Run("read existing object by ref", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
- require.Equal(t, float64(expectedRequests+1), testutil.ToFloat64(counter.WithLabelValues("info")))
- })
- }
-}
+ object, err := reader.Object(ctx, "refs/heads/main")
+ require.NoError(t, err)
-func TestObjectReader_object(t *testing.T) {
- t.Parallel()
+ data, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, commitContents, data)
+ })
- ctx := testhelper.Context(t)
+ t.Run("read existing object by object ID", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
- cfg := testcfg.Build(t)
- repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
- SkipCreationViaService: true,
+ object, err := reader.Object(ctx, commitID.Revision())
+ require.NoError(t, err)
+
+ data, err := io.ReadAll(object)
+ require.NoError(t, err)
+ require.Equal(t, data, commitContents)
})
- commitID := gittest.WriteCommit(t, cfg, repoPath,
- gittest.WithBranch("main"),
- gittest.WithMessage("commit message"),
- gittest.WithTreeEntries(gittest.TreeEntry{Path: "README", Mode: "100644", Content: "something"}),
- )
- gittest.WriteTag(t, cfg, repoPath, "v1.1.1", commitID.Revision(), gittest.WriteTagConfig{
- Message: "annotated tag",
+ t.Run("read missing ref", func(t *testing.T) {
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ _, err = reader.Object(ctx, "refs/heads/does-not-exist")
+ require.EqualError(t, err, "object not found")
+
+ // Verify that we're still able to read a commit after the previous read has failed.
+ object, err := reader.Object(ctx, commitID.Revision())
+ require.NoError(t, err)
+
+ data, err := io.ReadAll(object)
+ require.NoError(t, err)
+
+ require.Equal(t, commitContents, data)
})
t.Run("read fails when not consuming previous object", func(t *testing.T) {
@@ -212,8 +130,6 @@ func TestObjectReader_object(t *testing.T) {
}
func TestObjectReader_queue(t *testing.T) {
- t.Parallel()
-
ctx := testhelper.Context(t)
cfg := testcfg.Build(t)
@@ -232,7 +148,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
@@ -255,7 +171,7 @@ func TestObjectReader_queue(t *testing.T) {
foobarBlob: "foobar",
barfooBlob: "barfoo",
} {
- require.NoError(t, queue.RequestObject(blobID.Revision()))
+ require.NoError(t, queue.RequestRevision(blobID.Revision()))
require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
@@ -275,8 +191,8 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.RequestObject(barfooBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(barfooBlob.Revision()))
require.NoError(t, queue.Flush())
for _, expectedContents := range []string{"foobar", "barfoo"} {
@@ -289,53 +205,6 @@ func TestObjectReader_queue(t *testing.T) {
}
})
- t.Run("request multiple info", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestInfo(foobarBlob.Revision()))
- require.NoError(t, queue.RequestInfo(barfooBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- for _, blob := range []git.ObjectID{foobarBlob, barfooBlob} {
- info, err := queue.ReadInfo()
- require.NoError(t, err)
- require.Equal(t, &ObjectInfo{Oid: git.ObjectID(blob.Revision()), Type: "blob", Size: 6}, info)
- }
- })
-
- t.Run("request info and object together", func(t *testing.T) {
- reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
- require.NoError(t, err)
-
- queue, cleanup, err := reader.objectQueue(ctx, "trace")
- require.NoError(t, err)
- defer cleanup()
-
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
- require.NoError(t, queue.RequestInfo(barfooBlob.Revision()))
- require.NoError(t, queue.Flush())
-
- object, err := queue.ReadObject()
- require.NoError(t, err)
-
- contents, err := io.ReadAll(object)
- require.NoError(t, err)
- require.Equal(t, "foobar", string(contents))
-
- info, err := queue.ReadInfo()
- require.NoError(t, err)
- require.Equal(t, &ObjectInfo{
- Oid: git.ObjectID(barfooBlob.Revision()),
- Type: "blob",
- Size: int64(len("barfoo")),
- }, info)
- })
-
t.Run("read without request", func(t *testing.T) {
reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
require.NoError(t, err)
@@ -359,7 +228,7 @@ func TestObjectReader_queue(t *testing.T) {
// We flush once before and once after requesting the object such that we can be
// sure that it doesn't impact which objects we can read.
require.NoError(t, queue.Flush())
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
@@ -379,7 +248,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
for i := 0; i < 10; i++ {
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
}
require.NoError(t, queue.Flush())
@@ -415,7 +284,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject("does-not-exist"))
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
require.NoError(t, queue.Flush())
_, err = queue.ReadObject()
@@ -430,7 +299,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject("does-not-exist"))
+ require.NoError(t, queue.RequestRevision("does-not-exist"))
require.NoError(t, queue.Flush())
_, err = queue.ReadObject()
@@ -438,7 +307,7 @@ func TestObjectReader_queue(t *testing.T) {
// Requesting another object after the previous one has failed should continue to
// work alright.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
object, err := queue.ReadObject()
require.NoError(t, err)
@@ -478,7 +347,7 @@ func TestObjectReader_queue(t *testing.T) {
require.False(t, reader.isDirty())
require.False(t, queue.isDirty())
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
require.True(t, reader.isDirty())
@@ -511,7 +380,7 @@ func TestObjectReader_queue(t *testing.T) {
require.True(t, reader.isClosed())
require.True(t, queue.isClosed())
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject(foobarBlob.Revision()))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision(foobarBlob.Revision()))
})
t.Run("closing queue blocks read", func(t *testing.T) {
@@ -523,7 +392,7 @@ func TestObjectReader_queue(t *testing.T) {
defer cleanup()
// Request the object before we close the queue.
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
queue.close()
@@ -543,7 +412,7 @@ func TestObjectReader_queue(t *testing.T) {
require.NoError(t, err)
defer cleanup()
- require.NoError(t, queue.RequestObject(foobarBlob.Revision()))
+ require.NoError(t, queue.RequestRevision(foobarBlob.Revision()))
require.NoError(t, queue.Flush())
// Read the object header before closing.
@@ -559,3 +428,33 @@ func TestObjectReader_queue(t *testing.T) {
require.Equal(t, os.ErrClosed, err)
})
}
+
+func TestObjectReader_replaceRefs(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ cfg := testcfg.Build(t)
+ repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ originalOID := gittest.WriteBlob(t, cfg, repoPath, []byte("original"))
+ replacedOID := gittest.WriteBlob(t, cfg, repoPath, []byte("replaced"))
+
+ gittest.WriteRef(t, cfg, repoPath, git.ReferenceName("refs/replace/"+originalOID.String()), replacedOID)
+
+ // Reading the object via our testhelper should result in the object having been replaced.
+ require.Equal(t, "replaced", text.ChompBytes(gittest.Exec(t, cfg, "-C", repoPath, "cat-file", "-p", originalOID.String())))
+
+ reader, err := newObjectReader(ctx, newRepoExecutor(t, cfg, repoProto), nil)
+ require.NoError(t, err)
+
+ object, err := reader.Object(ctx, originalOID.Revision())
+ require.NoError(t, err)
+
+ contents, err := io.ReadAll(object)
+ require.NoError(t, err)
+
+ // But using our "normal" Git command execution code path, we still want to see the original
+ // content of the blob.
+ require.Equal(t, "original", string(contents))
+}
diff --git a/internal/git/catfile/request_queue.go b/internal/git/catfile/request_queue.go
index 08e64df00..232d731b1 100644
--- a/internal/git/catfile/request_queue.go
+++ b/internal/git/catfile/request_queue.go
@@ -11,22 +11,13 @@ import (
)
const (
- // contentsCommand is the command expected by the `--batch-command` mode of git-cat-file(1)
- // for reading an objects contents.
- contentsCommand = "contents"
- // infoCommand is the command expected by the `--batch-command` mode of git-cat-file(1)
- // for reading an objects info.
- infoCommand = "info"
- // flushCommand is the command expected by the `--batch-command` mode of git-cat-file(1)
- // for flushing out to stdout.
- flushCommand = "flush"
- // flushCommandHack is the command we send to git-cat-file(1) to cause it to flush its stdout.
+ // flushCommand is the command we send to git-cat-file(1) to cause it to flush its stdout.
// Note that this is a hack: git-cat-file(1) doesn't really support flushing, but it will
// flush whenever it encounters an object it doesn't know. The flush command we use is thus
// chosen such that it cannot ever refer to a valid object: refs may not contain whitespace,
// so this command cannot refer to a ref. Adding "FLUSH" is just for the sake of making it
// easier to spot what's going on in case we ever mistakenly see this output in the wild.
- flushCommandHack = "\tFLUSH\t"
+ flushCommand = "\tFLUSH\t"
)
type requestQueue struct {
@@ -57,10 +48,6 @@ type requestQueue struct {
// trace is the current tracing span.
trace *trace
-
- // isBatchCommand indicates whether `--batch-command` is used. We use this to determine if
- // commands need to be passed to git-cat-file(1).
- isBatchCommand bool
}
// isDirty returns true either if there are outstanding requests for objects or if the current
@@ -85,31 +72,14 @@ func (q *requestQueue) close() {
atomic.StoreInt32(&q.closed, 1)
}
-// RequestObject requests the contents for the given revision. A subsequent call has
-// to be made to ReadObject to read the contents.
-func (q *requestQueue) RequestObject(revision git.Revision) error {
- return q.requestRevision(contentsCommand, revision)
-}
-
-// RequestObject requests the info for the given revision. A subsequent call has to
-// be made to ReadInfo read the info.
-func (q *requestQueue) RequestInfo(revision git.Revision) error {
- return q.requestRevision(infoCommand, revision)
-}
-
-func (q *requestQueue) requestRevision(cmd string, revision git.Revision) error {
+func (q *requestQueue) RequestRevision(revision git.Revision) error {
if q.isClosed() {
return fmt.Errorf("cannot request revision: %w", os.ErrClosed)
}
atomic.AddInt64(&q.outstandingRequests, 1)
- input := revision.String()
- if q.isBatchCommand {
- input = cmd + " " + input
- }
-
- if _, err := q.stdin.WriteString(input); err != nil {
+ if _, err := q.stdin.WriteString(revision.String()); err != nil {
atomic.AddInt64(&q.outstandingRequests, -1)
return fmt.Errorf("writing object request: %w", err)
}
@@ -127,12 +97,7 @@ func (q *requestQueue) Flush() error {
return fmt.Errorf("cannot flush: %w", os.ErrClosed)
}
- cmd := flushCommandHack
- if q.isBatchCommand {
- cmd = flushCommand
- }
-
- if _, err := q.stdin.WriteString(cmd); err != nil {
+ if _, err := q.stdin.WriteString(flushCommand); err != nil {
return fmt.Errorf("writing flush command: %w", err)
}
@@ -152,7 +117,7 @@ type readerFunc func([]byte) (int, error)
func (fn readerFunc) Read(buf []byte) (int, error) { return fn(buf) }
func (q *requestQueue) ReadObject() (*Object, error) {
- if !q.isObjectQueue && !q.isBatchCommand {
+ if !q.isObjectQueue {
panic("object queue used to read object info")
}
@@ -218,7 +183,7 @@ func (q *requestQueue) ReadObject() (*Object, error) {
}
func (q *requestQueue) ReadInfo() (*ObjectInfo, error) {
- if q.isObjectQueue && !q.isBatchCommand {
+ if q.isObjectQueue {
panic("object queue used to read object info")
}
diff --git a/internal/git/catfile/request_queue_test.go b/internal/git/catfile/request_queue_test.go
index 638ecc2f7..8780e1065 100644
--- a/internal/git/catfile/request_queue_test.go
+++ b/internal/git/catfile/request_queue_test.go
@@ -22,6 +22,14 @@ func TestRequestQueue_ReadObject(t *testing.T) {
oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen()))
+ t.Run("ReadInfo on ReadObject queue", func(t *testing.T) {
+ _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n")
+
+ require.PanicsWithValue(t, "object queue used to read object info", func() {
+ _, _ = queue.ReadInfo()
+ })
+ })
+
t.Run("read without request", func(t *testing.T) {
_, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n")
_, err := queue.ReadObject()
@@ -31,7 +39,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
t.Run("read on closed reader", func(t *testing.T) {
reader, queue := newInterceptedQueue(t, ctx, "#!/bin/sh\nread\n")
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
require.True(t, queue.isDirty())
reader.close()
@@ -47,8 +55,8 @@ func TestRequestQueue_ReadObject(t *testing.T) {
`, oid))
// We queue two revisions...
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
// .. and only unqueue one object. This object isn't read though, ...
_, err := queue.ReadObject()
@@ -66,7 +74,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "something something"
`)
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
_, err := queue.ReadObject()
require.Equal(t, fmt.Errorf("invalid info line: %q", "something something"), err)
@@ -80,7 +88,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
exit 1
`)
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
_, err := queue.ReadObject()
require.Equal(t, fmt.Errorf("read info line: %w", io.EOF), err)
@@ -94,7 +102,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "%s missing"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
_, err := queue.ReadObject()
require.Equal(t, NotFoundError{error: fmt.Errorf("object not found")}, err)
@@ -110,7 +118,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "1234567890"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
require.True(t, queue.isDirty())
object, err := queue.ReadObject()
@@ -138,8 +146,8 @@ func TestRequestQueue_ReadObject(t *testing.T) {
echo "0987654321"
`, oid, secondOID))
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
require.True(t, queue.isDirty())
for _, expectedObject := range []struct {
@@ -183,7 +191,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
printf "123"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
require.True(t, queue.isDirty())
object, err := queue.ReadObject()
@@ -211,7 +219,7 @@ func TestRequestQueue_ReadObject(t *testing.T) {
})
}
-func TestRequestQueue_RequestObject(t *testing.T) {
+func TestRequestQueue_RequestRevision(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
@@ -231,7 +239,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
_, queue := newInterceptedQueue(t, ctx, "#!/bin/sh")
queue.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo"))
})
t.Run("requesting revision on closed process", func(t *testing.T) {
@@ -239,7 +247,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
process.close()
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestObject("foo"))
+ require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestRevision("foo"))
})
t.Run("single request", func(t *testing.T) {
@@ -249,10 +257,10 @@ func TestRequestQueue_RequestObject(t *testing.T) {
echo "${revision}"
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
+ require.NoError(t, queue.RequestRevision("foo"))
require.NoError(t, queue.Flush())
- requireRevision(t, queue, "contents foo")
+ requireRevision(t, queue, "foo")
})
t.Run("multiple request", func(t *testing.T) {
@@ -264,16 +272,16 @@ func TestRequestQueue_RequestObject(t *testing.T) {
done
`, oid))
- require.NoError(t, queue.RequestObject("foo"))
- require.NoError(t, queue.RequestObject("bar"))
- require.NoError(t, queue.RequestObject("baz"))
- require.NoError(t, queue.RequestObject("qux"))
+ require.NoError(t, queue.RequestRevision("foo"))
+ require.NoError(t, queue.RequestRevision("bar"))
+ require.NoError(t, queue.RequestRevision("baz"))
+ require.NoError(t, queue.RequestRevision("qux"))
require.NoError(t, queue.Flush())
- requireRevision(t, queue, "contents foo")
- requireRevision(t, queue, "contents bar")
- requireRevision(t, queue, "contents baz")
- requireRevision(t, queue, "contents qux")
+ requireRevision(t, queue, "foo")
+ requireRevision(t, queue, "bar")
+ requireRevision(t, queue, "baz")
+ requireRevision(t, queue, "qux")
})
t.Run("multiple request with intermediate flushing", func(t *testing.T) {
@@ -281,7 +289,7 @@ func TestRequestQueue_RequestObject(t *testing.T) {
while read revision
do
read flush
- if test "$flush" != "flush"
+ if test "$flush" != "FLUSH"
then
echo "expected a flush"
exit 1
@@ -298,100 +306,9 @@ func TestRequestQueue_RequestObject(t *testing.T) {
"foo",
"qux",
} {
- require.NoError(t, queue.RequestObject(revision))
- require.NoError(t, queue.Flush())
- requireRevision(t, queue, "contents "+revision)
- }
- })
-}
-
-func TestRequestQueue_RequestInfo(t *testing.T) {
- t.Parallel()
-
- ctx := testhelper.Context(t)
-
- oid := git.ObjectID(strings.Repeat("1", gittest.DefaultObjectHash.EncodedLen()))
- expectedInfo := &ObjectInfo{oid, "blob", 955}
-
- requireRevision := func(t *testing.T, queue *requestQueue) {
- info, err := queue.ReadInfo()
- require.NoError(t, err)
-
- require.NoError(t, err)
- require.Equal(t, info, expectedInfo)
- }
-
- t.Run("requesting revision on closed queue", func(t *testing.T) {
- _, queue := newInterceptedQueue(t, ctx, "#!/bin/sh")
- queue.close()
-
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo"))
- })
-
- t.Run("requesting revision on closed process", func(t *testing.T) {
- process, queue := newInterceptedQueue(t, ctx, "#!/bin/sh")
-
- process.close()
-
- require.Equal(t, fmt.Errorf("cannot request revision: %w", os.ErrClosed), queue.RequestInfo("foo"))
- })
-
- t.Run("single request", func(t *testing.T) {
- _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh
- read revision
- echo "%s blob 955"
- `, oid))
-
- require.NoError(t, queue.RequestInfo("foo"))
- require.NoError(t, queue.Flush())
-
- requireRevision(t, queue)
- })
-
- t.Run("multiple request", func(t *testing.T) {
- _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh
- while read revision
- do
- echo "%s blob 955"
- done
- `, oid))
-
- require.NoError(t, queue.RequestInfo("foo"))
- require.NoError(t, queue.RequestInfo("bar"))
- require.NoError(t, queue.RequestInfo("baz"))
- require.NoError(t, queue.RequestInfo("qux"))
- require.NoError(t, queue.Flush())
-
- requireRevision(t, queue)
- requireRevision(t, queue)
- requireRevision(t, queue)
- requireRevision(t, queue)
- })
-
- t.Run("multiple request with intermediate flushing", func(t *testing.T) {
- _, queue := newInterceptedQueue(t, ctx, fmt.Sprintf(`#!/bin/sh
- while read revision
- do
- read flush
- if test "$flush" != "flush"
- then
- echo "expected a flush"
- exit 1
- fi
-
- echo "%s blob 955"
- done
- `, oid))
-
- for _, revision := range []git.Revision{
- "foo",
- "bar",
- "foo",
- "qux",
- } {
- require.NoError(t, queue.RequestInfo(revision))
+ require.NoError(t, queue.RequestRevision(revision))
require.NoError(t, queue.Flush())
- requireRevision(t, queue)
+ requireRevision(t, queue, revision)
}
})
}
diff --git a/internal/git/catfile/tag.go b/internal/git/catfile/tag.go
index 2636ed1ca..67f38e8fa 100644
--- a/internal/git/catfile/tag.go
+++ b/internal/git/catfile/tag.go
@@ -14,7 +14,7 @@ import (
// GetTag looks up a commit by tagID using an existing catfile.Batch instance. Note: we pass
// in the tagName because the tag name from refs/tags may be different than the name found in the
// actual tag object. We want to use the tagName found in refs/tags
-func GetTag(ctx context.Context, objectReader ObjectContentReader, tagID git.Revision, tagName string) (*gitalypb.Tag, error) {
+func GetTag(ctx context.Context, objectReader ObjectReader, tagID git.Revision, tagName string) (*gitalypb.Tag, error) {
object, err := objectReader.Object(ctx, tagID)
if err != nil {
return nil, err
@@ -51,7 +51,7 @@ func ExtractTagSignature(content []byte) ([]byte, []byte) {
return nil, content
}
-func buildAnnotatedTag(ctx context.Context, objectReader ObjectContentReader, object git.Object, name []byte) (*gitalypb.Tag, error) {
+func buildAnnotatedTag(ctx context.Context, objectReader ObjectReader, object git.Object, name []byte) (*gitalypb.Tag, error) {
tag, tagged, err := newParser().parseTag(object, name)
if err != nil {
return nil, err
@@ -77,7 +77,7 @@ func buildAnnotatedTag(ctx context.Context, objectReader ObjectContentReader, ob
// dereferenceTag recursively dereferences annotated tags until it finds a non-tag object. If it is
// a commit, then it will parse and return this commit. Otherwise, if the tagged object is not a
// commit, it will simply discard the object and not return an error.
-func dereferenceTag(ctx context.Context, objectReader ObjectContentReader, oid git.Revision) (*gitalypb.GitCommit, error) {
+func dereferenceTag(ctx context.Context, objectReader ObjectReader, oid git.Revision) (*gitalypb.GitCommit, error) {
object, err := objectReader.Object(ctx, oid+"^{}")
if err != nil {
return nil, fmt.Errorf("peeling tag: %w", err)
diff --git a/internal/git/catfile/tag_test.go b/internal/git/catfile/tag_test.go
index fa9da5763..efc54e20c 100644
--- a/internal/git/catfile/tag_test.go
+++ b/internal/git/catfile/tag_test.go
@@ -1,7 +1,6 @@
package catfile
import (
- "context"
"fmt"
"strings"
"testing"
@@ -10,18 +9,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
)
func TestGetTag(t *testing.T) {
- t.Parallel()
+ ctx := testhelper.Context(t)
- testhelper.NewFeatureSets(featureflag.CatfileBatchCommand).Run(t, testGetTag)
-}
-
-func testGetTag(t *testing.T, ctx context.Context) {
cfg, objectReader, _, repoPath := setupObjectReader(t, ctx)
commitID := gittest.WriteCommit(t, cfg, repoPath)
diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go
index 50a6a39e5..430d24088 100644
--- a/internal/git/catfile/testhelper_test.go
+++ b/internal/git/catfile/testhelper_test.go
@@ -55,7 +55,7 @@ func (e *repoExecutor) ObjectHash(ctx context.Context) (git.ObjectHash, error) {
return gittest.DefaultObjectHash, nil
}
-func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectContentReader, *gitalypb.Repository, string) {
+func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectReader, *gitalypb.Repository, string) {
t.Helper()
cfg := testcfg.Build(t)
diff --git a/internal/git/catfile/tree_entries.go b/internal/git/catfile/tree_entries.go
index 91bc4b663..7e27edb81 100644
--- a/internal/git/catfile/tree_entries.go
+++ b/internal/git/catfile/tree_entries.go
@@ -19,13 +19,13 @@ type revisionPath struct{ revision, path string }
// TreeEntryFinder is a struct for searching through a tree with caching.
type TreeEntryFinder struct {
- objectReader ObjectContentReader
+ objectReader ObjectReader
objectInfoReader ObjectInfoReader
treeCache map[revisionPath][]*gitalypb.TreeEntry
}
// NewTreeEntryFinder initializes a TreeEntryFinder with an empty tree cache.
-func NewTreeEntryFinder(objectReader ObjectContentReader, objectInfoReader ObjectInfoReader) *TreeEntryFinder {
+func NewTreeEntryFinder(objectReader ObjectReader, objectInfoReader ObjectInfoReader) *TreeEntryFinder {
return &TreeEntryFinder{
objectReader: objectReader,
objectInfoReader: objectInfoReader,
@@ -107,7 +107,7 @@ func extractEntryInfoFromTreeData(treeData io.Reader, commitOid, rootOid, rootPa
// TreeEntries returns the entries of a tree in given revision and path.
func TreeEntries(
ctx context.Context,
- objectReader ObjectContentReader,
+ objectReader ObjectReader,
objectInfoReader ObjectInfoReader,
revision, path string,
) (_ []*gitalypb.TreeEntry, returnedErr error) {
diff --git a/internal/git/gitpipe/catfile_info.go b/internal/git/gitpipe/catfile_info.go
index 55eaf7432..07e29525b 100644
--- a/internal/git/gitpipe/catfile_info.go
+++ b/internal/git/gitpipe/catfile_info.go
@@ -71,7 +71,7 @@ func CatfileInfo(
opt(&cfg)
}
- queue, queueCleanup, err := objectInfoReader.ObjectQueue(ctx)
+ queue, queueCleanup, err := objectInfoReader.InfoQueue(ctx)
if err != nil {
return nil, err
}
@@ -88,7 +88,7 @@ func CatfileInfo(
var i int64
for it.Next() {
- if err := queue.RequestInfo(it.ObjectID().Revision()); err != nil {
+ if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
sendCatfileInfoRequest(ctx, requestChan, catfileInfoRequest{err: err})
return
}
diff --git a/internal/git/gitpipe/catfile_info_test.go b/internal/git/gitpipe/catfile_info_test.go
index 6cfdcd46e..79d7de32d 100644
--- a/internal/git/gitpipe/catfile_info_test.go
+++ b/internal/git/gitpipe/catfile_info_test.go
@@ -268,7 +268,7 @@ func TestCatfileInfo(t *testing.T) {
// Reusing the queue is not allowed, so we should get an error here.
_, err = CatfileInfo(ctx, objectInfoReader, NewRevisionIterator(ctx, input))
- require.Equal(t, fmt.Errorf("object queue already in use"), err)
+ require.Equal(t, fmt.Errorf("object info queue already in use"), err)
// We now consume all the input of the iterator.
require.True(t, it.Next())
diff --git a/internal/git/gitpipe/catfile_object.go b/internal/git/gitpipe/catfile_object.go
index 65c18013a..5fe031579 100644
--- a/internal/git/gitpipe/catfile_object.go
+++ b/internal/git/gitpipe/catfile_object.go
@@ -36,7 +36,7 @@ type catfileObjectRequest struct {
// be fully consumed by the caller.
func CatfileObject(
ctx context.Context,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
it ObjectIterator,
) (CatfileObjectIterator, error) {
queue, queueCleanup, err := objectReader.ObjectQueue(ctx)
@@ -72,7 +72,7 @@ func CatfileObject(
var i int64
for it.Next() {
- if err := queue.RequestObject(it.ObjectID().Revision()); err != nil {
+ if err := queue.RequestRevision(it.ObjectID().Revision()); err != nil {
sendRequest(catfileObjectRequest{err: err})
return
}
diff --git a/internal/git/log/last_commit.go b/internal/git/log/last_commit.go
index eacc030cb..de2d484d0 100644
--- a/internal/git/log/last_commit.go
+++ b/internal/git/log/last_commit.go
@@ -16,7 +16,7 @@ import (
func LastCommitForPath(
ctx context.Context,
gitCmdFactory git.CommandFactory,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
repo repository.GitRepo,
revision git.Revision,
path string,
diff --git a/internal/git/log/parser.go b/internal/git/log/parser.go
index 61692c701..173bcc37b 100644
--- a/internal/git/log/parser.go
+++ b/internal/git/log/parser.go
@@ -19,7 +19,7 @@ type Parser struct {
scanner *bufio.Scanner
currentCommit *gitalypb.GitCommit
err error
- objectReader catfile.ObjectContentReader
+ objectReader catfile.ObjectReader
}
// NewParser returns a new Parser
diff --git a/internal/gitaly/service/blob/get_blobs.go b/internal/gitaly/service/blob/get_blobs.go
index 634a942d9..51a173a8f 100644
--- a/internal/gitaly/service/blob/get_blobs.go
+++ b/internal/gitaly/service/blob/get_blobs.go
@@ -23,7 +23,7 @@ var treeEntryToObjectType = map[gitalypb.TreeEntry_EntryType]gitalypb.ObjectType
func sendGetBlobsResponse(
req *gitalypb.GetBlobsRequest,
stream gitalypb.BlobService_GetBlobsServer,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
objectInfoReader catfile.ObjectInfoReader,
) error {
ctx := stream.Context()
@@ -99,7 +99,7 @@ func sendGetBlobsResponse(
func sendBlobTreeEntry(
response *gitalypb.GetBlobsResponse,
stream gitalypb.BlobService_GetBlobsServer,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
limit int64,
) (returnedErr error) {
ctx := stream.Context()
diff --git a/internal/gitaly/service/commit/filter_shas_with_signatures.go b/internal/gitaly/service/commit/filter_shas_with_signatures.go
index 2576f1757..2b63711f6 100644
--- a/internal/gitaly/service/commit/filter_shas_with_signatures.go
+++ b/internal/gitaly/service/commit/filter_shas_with_signatures.go
@@ -63,7 +63,7 @@ func (s *server) filterShasWithSignatures(bidi gitalypb.CommitService_FilterShas
}
}
-func filterCommitShasWithSignatures(ctx context.Context, objectReader catfile.ObjectContentReader, shas [][]byte) ([][]byte, error) {
+func filterCommitShasWithSignatures(ctx context.Context, objectReader catfile.ObjectReader, shas [][]byte) ([][]byte, error) {
var foundShas [][]byte
for _, sha := range shas {
commit, err := catfile.GetCommit(ctx, objectReader, git.Revision(sha))
diff --git a/internal/gitaly/service/commit/find_commits.go b/internal/gitaly/service/commit/find_commits.go
index 221ed4336..c22298866 100644
--- a/internal/gitaly/service/commit/find_commits.go
+++ b/internal/gitaly/service/commit/find_commits.go
@@ -108,11 +108,11 @@ func calculateOffsetManually(req *gitalypb.FindCommitsRequest) bool {
// GetCommits wraps a git log command that can be iterated on to get individual commit objects
type GetCommits struct {
scanner *bufio.Scanner
- objectReader catfile.ObjectContentReader
+ objectReader catfile.ObjectReader
}
// NewGetCommits returns a new GetCommits object
-func NewGetCommits(cmd *command.Command, objectReader catfile.ObjectContentReader, shortStat bool) *GetCommits {
+func NewGetCommits(cmd *command.Command, objectReader catfile.ObjectReader, shortStat bool) *GetCommits {
getCommits := &GetCommits{
scanner: bufio.NewScanner(cmd),
objectReader: objectReader,
diff --git a/internal/gitaly/service/commit/tree_entries.go b/internal/gitaly/service/commit/tree_entries.go
index c5f70e440..0d1f88c74 100644
--- a/internal/gitaly/service/commit/tree_entries.go
+++ b/internal/gitaly/service/commit/tree_entries.go
@@ -44,7 +44,7 @@ func validateGetTreeEntriesRequest(in *gitalypb.GetTreeEntriesRequest) error {
func populateFlatPath(
ctx context.Context,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
objectInfoReader catfile.ObjectInfoReader,
entries []*gitalypb.TreeEntry,
) error {
@@ -86,7 +86,7 @@ func (s *server) sendTreeEntries(
var entries []*gitalypb.TreeEntry
var (
- objectReader catfile.ObjectContentReader
+ objectReader catfile.ObjectReader
objectInfoReader catfile.ObjectInfoReader
)
diff --git a/internal/gitaly/service/commit/tree_entry.go b/internal/gitaly/service/commit/tree_entry.go
index 60071226a..3878c2965 100644
--- a/internal/gitaly/service/commit/tree_entry.go
+++ b/internal/gitaly/service/commit/tree_entry.go
@@ -15,7 +15,7 @@ import (
func sendTreeEntry(
stream gitalypb.CommitService_TreeEntryServer,
- objectReader catfile.ObjectContentReader,
+ objectReader catfile.ObjectReader,
objectInfoReader catfile.ObjectInfoReader,
revision, path string,
limit, maxSize int64,
diff --git a/internal/gitaly/service/ref/find_tag.go b/internal/gitaly/service/ref/find_tag.go
index 45daba0c4..982f49920 100644
--- a/internal/gitaly/service/ref/find_tag.go
+++ b/internal/gitaly/service/ref/find_tag.go
@@ -30,7 +30,7 @@ func (s *server) FindTag(ctx context.Context, in *gitalypb.FindTagRequest) (*git
}
// parseTagLine parses a line of text with the output format %(objectname) %(objecttype) %(refname:lstrip=2)
-func parseTagLine(ctx context.Context, objectReader catfile.ObjectContentReader, tagLine string) (*gitalypb.Tag, error) {
+func parseTagLine(ctx context.Context, objectReader catfile.ObjectReader, tagLine string) (*gitalypb.Tag, error) {
fields := strings.SplitN(tagLine, " ", 3)
if len(fields) != 3 {
return nil, fmt.Errorf("invalid output from for-each-ref command: %v", tagLine)
diff --git a/internal/gitaly/service/ref/util.go b/internal/gitaly/service/ref/util.go
index 225d70850..ebeb48a68 100644
--- a/internal/gitaly/service/ref/util.go
+++ b/internal/gitaly/service/ref/util.go
@@ -58,7 +58,7 @@ func buildLocalBranch(name []byte, target *gitalypb.GitCommit) *gitalypb.FindLoc
return response
}
-func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectContentReader, elements [][]byte) (*gitalypb.FindAllBranchesResponse_Branch, error) {
+func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectReader, elements [][]byte) (*gitalypb.FindAllBranchesResponse_Branch, error) {
target, err := catfile.GetCommit(ctx, objectReader, git.Revision(elements[1]))
if err != nil {
return nil, err
@@ -70,7 +70,7 @@ func buildAllBranchesBranch(ctx context.Context, objectReader catfile.ObjectCont
}, nil
}
-func buildBranch(ctx context.Context, objectReader catfile.ObjectContentReader, elements [][]byte) (*gitalypb.Branch, error) {
+func buildBranch(ctx context.Context, objectReader catfile.ObjectReader, elements [][]byte) (*gitalypb.Branch, error) {
target, err := catfile.GetCommit(ctx, objectReader, git.Revision(elements[1]))
if err != nil {
return nil, err
@@ -82,7 +82,7 @@ func buildBranch(ctx context.Context, objectReader catfile.ObjectContentReader,
}, nil
}
-func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender {
+func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServer, objectReader catfile.ObjectReader) lines.Sender {
return func(refs [][]byte) error {
ctx := stream.Context()
var response *gitalypb.FindLocalBranchesResponse
@@ -109,7 +109,7 @@ func newFindLocalBranchesWriter(stream gitalypb.RefService_FindLocalBranchesServ
}
}
-func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender {
+func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer, objectReader catfile.ObjectReader) lines.Sender {
return func(refs [][]byte) error {
var branches []*gitalypb.FindAllBranchesResponse_Branch
ctx := stream.Context()
@@ -129,7 +129,7 @@ func newFindAllBranchesWriter(stream gitalypb.RefService_FindAllBranchesServer,
}
}
-func newFindAllRemoteBranchesWriter(stream gitalypb.RefService_FindAllRemoteBranchesServer, objectReader catfile.ObjectContentReader) lines.Sender {
+func newFindAllRemoteBranchesWriter(stream gitalypb.RefService_FindAllRemoteBranchesServer, objectReader catfile.ObjectReader) lines.Sender {
return func(refs [][]byte) error {
var branches []*gitalypb.Branch
ctx := stream.Context()
diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go
index a06b32e1a..4c1e436bf 100644
--- a/internal/gitaly/service/repository/apply_gitattributes.go
+++ b/internal/gitaly/service/repository/apply_gitattributes.go
@@ -23,7 +23,7 @@ import (
const attributesFileMode os.FileMode = 0o644
-func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, objectReader catfile.ObjectContentReader, repoPath string, revision []byte) (returnedErr error) {
+func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, objectReader catfile.ObjectReader, repoPath string, revision []byte) (returnedErr error) {
infoPath := filepath.Join(repoPath, "info")
attributesPath := filepath.Join(infoPath, "attributes")
diff --git a/internal/metadata/featureflag/ff_catfile_batch_command.go b/internal/metadata/featureflag/ff_catfile_batch_command.go
deleted file mode 100644
index e3022f9cd..000000000
--- a/internal/metadata/featureflag/ff_catfile_batch_command.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package featureflag
-
-// CatfileBatchCommand enables the `--batch-command` mode for git-cat-file(1).
-var CatfileBatchCommand = NewFeatureFlag(
- "catfile_batch_command",
- "v15.6.0",
- "https://gitlab.com/gitlab-org/gitaly/-/issues/4573",
- false,
-)
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 9cbf45d9b..1bee6fa47 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -202,9 +202,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0)
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV238, rnd.Int()%2 == 0)
- // CatfileBatchCommand affects many tests since most of them rely on catfile for content/info
- // information about objects.
- ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.CatfileBatchCommand, rnd.Int()%2 == 0)
for _, opt := range opts {
ctx = opt(ctx)