diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-14 14:07:12 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-06-14 14:26:11 +0300 |
commit | 9fb72bd58083ca327af9ed56c30a7485510cc0cf (patch) | |
tree | 653053f0da06e6e1b0adb453c23edd3b305b8e13 | |
parent | ce82f7994d5c1ea0272770ef69a41c8f5ed0af35 (diff) |
blob: Introduce new pipeline step to enumerate all objects
We're about to convert `ListAllLFSPointers()` to use our new pipeline
code. What this RPC does is to enumerate all existing blobs via `git
cat-file --batch-all-objects` regardless of whether they are referenced
by anything or not.
Unfortunately, our catfile cache doesn't allow for such usage, and thus
we can't reuse or adapt any of the existing pipeline steps. This commit
thus introduces a new pipeline step which uses above command to return
all objects part of the repository.
-rw-r--r-- | internal/gitaly/service/blob/pipeline.go | 70 | ||||
-rw-r--r-- | internal/gitaly/service/blob/pipeline_test.go | 33 |
2 files changed, 103 insertions, 0 deletions
diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go index 0c14221bf..3be39886f 100644 --- a/internal/gitaly/service/blob/pipeline.go +++ b/internal/gitaly/service/blob/pipeline.go @@ -4,7 +4,9 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" + "io" "io/ioutil" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -206,6 +208,74 @@ func catfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan < return resultChan } +// catfileInfoAllObjects enumerates all Git objects part of the repository's object directory and +// extracts their object info via `git cat-file --batch-check`. The returned channel will contain +// all processed results. Any error encountered during execution of this pipeline step will cause +// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this +// pipeline step, the resulting catfileInfoResults will never have an object name. +func catfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan catfileInfoResult { + resultChan := make(chan catfileInfoResult) + + go func() { + defer close(resultChan) + + sendResult := func(result catfileInfoResult) bool { + select { + case resultChan <- result: + return false + case <-ctx.Done(): + return true + } + } + + cmd, err := repo.Exec(ctx, git.SubCmd{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch-all-objects"}, + git.Flag{Name: "--batch-check"}, + git.Flag{Name: "--buffer"}, + git.Flag{Name: "--unordered"}, + }, + }) + if err != nil { + sendResult(catfileInfoResult{ + err: fmt.Errorf("spawning cat-file failed: %w", err), + }) + return + } + + reader := bufio.NewReader(cmd) + for { + objectInfo, err := catfile.ParseObjectInfo(reader) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + sendResult(catfileInfoResult{ + err: fmt.Errorf("parsing object info: %w", err), + }) + return + } + + if isDone := sendResult(catfileInfoResult{ + objectInfo: objectInfo, + }); isDone { + return + } + } + + if err := cmd.Wait(); err != nil { + sendResult(catfileInfoResult{ + err: fmt.Errorf("cat-file failed: %w", err), + }) + return + } + }() + + return resultChan +} + // catfileInfoFilter filters the catfileInfoResults from the provided channel with the filter // function: if the filter returns `false` for a given item, then it will be dropped from the // pipeline. Errors cannot be filtered and will always be passed through. diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go index 19b0a00bd..fc3ad6617 100644 --- a/internal/gitaly/service/blob/pipeline_test.go +++ b/internal/gitaly/service/blob/pipeline_test.go @@ -358,6 +358,39 @@ func TestCatfileInfo(t *testing.T) { } } +func TestCatfileAllObjects(t *testing.T) { + cfg := testcfg.Build(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0]) + defer cleanup() + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) + blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo")) + tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foobar", Mode: "100644", OID: blob1}, + }) + commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) + + resultChan := catfileInfoAllObjects(ctx, repo) + + var results []catfileInfoResult + for result := range resultChan { + require.NoError(t, result.err) + results = append(results, result) + } + + require.ElementsMatch(t, []catfileInfoResult{ + {objectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}}, + {objectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}}, + {objectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}}, + {objectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}}, + }, results) +} + func TestCatfileInfoFilter(t *testing.T) { for _, tc := range []struct { desc string |