Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-14 14:07:12 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-06-14 14:26:11 +0300
commit9fb72bd58083ca327af9ed56c30a7485510cc0cf (patch)
tree653053f0da06e6e1b0adb453c23edd3b305b8e13
parentce82f7994d5c1ea0272770ef69a41c8f5ed0af35 (diff)
blob: Introduce new pipeline step to enumerate all objects
We're about to convert `ListAllLFSPointers()` to use our new pipeline code. What this RPC does is to enumerate all existing blobs via `git cat-file --batch-all-objects` regardless of whether they are referenced by anything or not. Unfortunately, our catfile cache doesn't allow for such usage, and thus we can't reuse or adapt any of the existing pipeline steps. This commit thus introduces a new pipeline step which uses above command to return all objects part of the repository.
-rw-r--r--internal/gitaly/service/blob/pipeline.go70
-rw-r--r--internal/gitaly/service/blob/pipeline_test.go33
2 files changed, 103 insertions, 0 deletions
diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go
index 0c14221bf..3be39886f 100644
--- a/internal/gitaly/service/blob/pipeline.go
+++ b/internal/gitaly/service/blob/pipeline.go
@@ -4,7 +4,9 @@ import (
"bufio"
"bytes"
"context"
+ "errors"
"fmt"
+ "io"
"io/ioutil"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -206,6 +208,74 @@ func catfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <
return resultChan
}
+// catfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
+// extracts their object info via `git cat-file --batch-check`. The returned channel will contain
+// all processed results. Any error encountered during execution of this pipeline step will cause
+// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
+// pipeline step, the resulting catfileInfoResults will never have an object name.
+func catfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan catfileInfoResult {
+ resultChan := make(chan catfileInfoResult)
+
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result catfileInfoResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ cmd, err := repo.Exec(ctx, git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-all-objects"},
+ git.Flag{Name: "--batch-check"},
+ git.Flag{Name: "--buffer"},
+ git.Flag{Name: "--unordered"},
+ },
+ })
+ if err != nil {
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("spawning cat-file failed: %w", err),
+ })
+ return
+ }
+
+ reader := bufio.NewReader(cmd)
+ for {
+ objectInfo, err := catfile.ParseObjectInfo(reader)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("parsing object info: %w", err),
+ })
+ return
+ }
+
+ if isDone := sendResult(catfileInfoResult{
+ objectInfo: objectInfo,
+ }); isDone {
+ return
+ }
+ }
+
+ if err := cmd.Wait(); err != nil {
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("cat-file failed: %w", err),
+ })
+ return
+ }
+ }()
+
+ return resultChan
+}
+
// catfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
// function: if the filter returns `false` for a given item, then it will be dropped from the
// pipeline. Errors cannot be filtered and will always be passed through.
diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go
index 19b0a00bd..fc3ad6617 100644
--- a/internal/gitaly/service/blob/pipeline_test.go
+++ b/internal/gitaly/service/blob/pipeline_test.go
@@ -358,6 +358,39 @@ func TestCatfileInfo(t *testing.T) {
}
}
+func TestCatfileAllObjects(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
+ tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
+ {Path: "foobar", Mode: "100644", OID: blob1},
+ })
+ commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
+
+ resultChan := catfileInfoAllObjects(ctx, repo)
+
+ var results []catfileInfoResult
+ for result := range resultChan {
+ require.NoError(t, result.err)
+ results = append(results, result)
+ }
+
+ require.ElementsMatch(t, []catfileInfoResult{
+ {objectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
+ {objectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}},
+ {objectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}},
+ {objectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
+ }, results)
+}
+
func TestCatfileInfoFilter(t *testing.T) {
for _, tc := range []struct {
desc string