diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-06-16 16:15:30 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2021-06-16 16:15:30 +0300 |
commit | 77d6f6e6bee63c41438ec5c186c10fa17b91fd7c (patch) | |
tree | 4614a16d52773d955ac0aeb66f94441257192c43 | |
parent | b82b2da71ab8ad5e61e4f162b99e64a7ab78ca7f (diff) | |
parent | 69e2252e25af53f7aeee1313b2308340df4b293a (diff) |
Merge branch 'pks-lfs-pointers-full-pipeline' into 'master'
Convert remaining LFS RPCs to use new pipeline code
See merge request gitlab-org/gitaly!3588
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers.go | 142 | ||||
-rw-r--r-- | internal/gitaly/service/blob/lfs_pointers_test.go | 33 | ||||
-rw-r--r-- | internal/gitaly/service/blob/pipeline.go | 70 | ||||
-rw-r--r-- | internal/gitaly/service/blob/pipeline_test.go | 33 |
4 files changed, 226 insertions, 52 deletions
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go index 466eb4d24..a16523428 100644 --- a/internal/gitaly/service/blob/lfs_pointers.go +++ b/internal/gitaly/service/blob/lfs_pointers.go @@ -83,28 +83,8 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git return git.IsLFSPointer(r.objectData) }) - var i int32 - for lfsPointer := range catfileObjectChan { - if lfsPointer.err != nil { - return helper.ErrInternal(lfsPointer.err) - } - - if err := chunker.Send(&gitalypb.LFSPointer{ - Data: lfsPointer.objectData, - Size: lfsPointer.objectInfo.Size, - Oid: lfsPointer.objectInfo.Oid.String(), - }); err != nil { - return helper.ErrInternal(fmt.Errorf("sending LFS pointer chunk: %w", err)) - } - - i++ - if in.Limit > 0 && i >= in.Limit { - break - } - } - - if err := chunker.Flush(); err != nil { - return helper.ErrInternal(err) + if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { + return err } } @@ -121,18 +101,6 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre } repo := s.localrepo(in.GetRepository()) - cmd, err := repo.Exec(ctx, git.SubCmd{ - Name: "cat-file", - Flags: []git.Option{ - git.Flag{Name: "--batch-all-objects"}, - git.Flag{Name: "--batch-check=%(objecttype) %(objectsize) %(objectname)"}, - git.Flag{Name: "--buffer"}, - git.Flag{Name: "--unordered"}, - }, - }) - if err != nil { - return status.Errorf(codes.Internal, "could not run batch-check: %v", err) - } chunker := chunk.New(&lfsPointerSender{ send: func(pointers []*gitalypb.LFSPointer) error { @@ -142,13 +110,46 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre }, }) - filteredReader := transform.NewReader(cmd, blobFilter{ - maxSize: lfsPointerMaxSize, - }) + if featureflag.IsDisabled(ctx, featureflag.LFSPointersPipeline) { + cmd, err := repo.Exec(ctx, git.SubCmd{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch-all-objects"}, + git.Flag{Name: "--batch-check=%(objecttype) %(objectsize) %(objectname)"}, + git.Flag{Name: "--buffer"}, + git.Flag{Name: "--unordered"}, + }, + }) + if err != nil { + return status.Errorf(codes.Internal, "could not run batch-check: %v", err) + } + + filteredReader := transform.NewReader(cmd, blobFilter{ + maxSize: lfsPointerMaxSize, + }) - if err := readLFSPointers(ctx, repo, chunker, filteredReader, int(in.Limit)); err != nil { - if !errors.Is(err, errLimitReached) { - return status.Errorf(codes.Internal, "could not read LFS pointers: %v", err) + if err := readLFSPointers(ctx, repo, chunker, filteredReader, int(in.Limit)); err != nil { + if !errors.Is(err, errLimitReached) { + return status.Errorf(codes.Internal, "could not read LFS pointers: %v", err) + } + } + } else { + catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo) + if err != nil { + return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err)) + } + + catfileInfoChan := catfileInfoAllObjects(ctx, repo) + catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool { + return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize + }) + catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool { + return git.IsLFSPointer(r.objectData) + }) + + if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil { + return err } } @@ -166,7 +167,6 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita } repo := s.localrepo(req.GetRepository()) - objectIDs := strings.Join(req.BlobIds, "\n") chunker := chunk.New(&lfsPointerSender{ send: func(pointers []*gitalypb.LFSPointer) error { @@ -176,8 +176,36 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita }, }) - if err := readLFSPointers(ctx, repo, chunker, strings.NewReader(objectIDs), 0); err != nil { - if !errors.Is(err, errLimitReached) { + if featureflag.IsDisabled(ctx, featureflag.LFSPointersPipeline) { + objectIDs := strings.Join(req.BlobIds, "\n") + + if err := readLFSPointers(ctx, repo, chunker, strings.NewReader(objectIDs), 0); err != nil { + if !errors.Is(err, errLimitReached) { + return err + } + } + } else { + catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo) + if err != nil { + return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err)) + } + + objectChan := make(chan revlistResult, len(req.GetBlobIds())) + for _, blobID := range req.GetBlobIds() { + objectChan <- revlistResult{oid: git.ObjectID(blobID)} + } + close(objectChan) + + catfileInfoChan := catfileInfo(ctx, catfileProcess, objectChan) + catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool { + return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize + }) + catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan) + catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool { + return git.IsLFSPointer(r.objectData) + }) + + if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil { return err } } @@ -348,3 +376,31 @@ func (t *lfsPointerSender) Append(m proto.Message) { func (t *lfsPointerSender) Send() error { return t.send(t.pointers) } + +func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectResult, limit int) error { + var i int + for lfsPointer := range lfsPointers { + if lfsPointer.err != nil { + return helper.ErrInternal(lfsPointer.err) + } + + if err := chunker.Send(&gitalypb.LFSPointer{ + Data: lfsPointer.objectData, + Size: lfsPointer.objectInfo.Size, + Oid: lfsPointer.objectInfo.Oid.String(), + }); err != nil { + return helper.ErrInternal(fmt.Errorf("sending LFS pointer chunk: %w", err)) + } + + i++ + if limit > 0 && i >= limit { + break + } + } + + if err := chunker.Flush(); err != nil { + return helper.ErrInternal(err) + } + + return nil +} diff --git a/internal/gitaly/service/blob/lfs_pointers_test.go b/internal/gitaly/service/blob/lfs_pointers_test.go index 2fefa59da..879b3a183 100644 --- a/internal/gitaly/service/blob/lfs_pointers_test.go +++ b/internal/gitaly/service/blob/lfs_pointers_test.go @@ -187,6 +187,14 @@ func testListLFSPointers(t *testing.T, ctx context.Context) { } func TestListAllLFSPointers(t *testing.T) { + testhelper.NewFeatureSets([]featureflag.FeatureFlag{ + featureflag.LFSPointersPipeline, + }).Run(t, func(t *testing.T, ctx context.Context) { + testListAllLFSPointers(t, ctx) + }) +} + +func testListAllLFSPointers(t *testing.T, ctx context.Context) { receivePointers := func(t *testing.T, stream gitalypb.BlobService_ListAllLFSPointersClient) []*gitalypb.LFSPointer { t.Helper() @@ -202,9 +210,6 @@ func TestListAllLFSPointers(t *testing.T) { return pointers } - ctx, cancel := testhelper.Context() - defer cancel() - lfsPointerContents := `version https://git-lfs.github.com/spec/v1 oid sha256:1111111111111111111111111111111111111111111111111111111111111111 size 12345` @@ -301,10 +306,15 @@ size 12345` } func TestSuccessfulGetLFSPointersRequest(t *testing.T) { - _, repo, _, client := setup(t) + testhelper.NewFeatureSets([]featureflag.FeatureFlag{ + featureflag.LFSPointersPipeline, + }).Run(t, func(t *testing.T, ctx context.Context) { + testSuccessfulGetLFSPointersRequest(t, ctx) + }) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testSuccessfulGetLFSPointersRequest(t *testing.T, ctx context.Context) { + _, repo, _, client := setup(t) lfsPointerIds := []string{ lfsPointer1, @@ -346,10 +356,15 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) { } func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { - _, repo, _, client := setup(t) + testhelper.NewFeatureSets([]featureflag.FeatureFlag{ + featureflag.LFSPointersPipeline, + }).Run(t, func(t *testing.T, ctx context.Context) { + testFailedGetLFSPointersRequestDueToValidations(t, ctx) + }) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testFailedGetLFSPointersRequestDueToValidations(t *testing.T, ctx context.Context) { + _, repo, _, client := setup(t) testCases := []struct { desc string diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go index 0c14221bf..3be39886f 100644 --- a/internal/gitaly/service/blob/pipeline.go +++ b/internal/gitaly/service/blob/pipeline.go @@ -4,7 +4,9 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" + "io" "io/ioutil" "gitlab.com/gitlab-org/gitaly/v14/internal/git" @@ -206,6 +208,74 @@ func catfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan < return resultChan } +// catfileInfoAllObjects enumerates all Git objects part of the repository's object directory and +// extracts their object info via `git cat-file --batch-check`. The returned channel will contain +// all processed results. Any error encountered during execution of this pipeline step will cause +// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this +// pipeline step, the resulting catfileInfoResults will never have an object name. +func catfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan catfileInfoResult { + resultChan := make(chan catfileInfoResult) + + go func() { + defer close(resultChan) + + sendResult := func(result catfileInfoResult) bool { + select { + case resultChan <- result: + return false + case <-ctx.Done(): + return true + } + } + + cmd, err := repo.Exec(ctx, git.SubCmd{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch-all-objects"}, + git.Flag{Name: "--batch-check"}, + git.Flag{Name: "--buffer"}, + git.Flag{Name: "--unordered"}, + }, + }) + if err != nil { + sendResult(catfileInfoResult{ + err: fmt.Errorf("spawning cat-file failed: %w", err), + }) + return + } + + reader := bufio.NewReader(cmd) + for { + objectInfo, err := catfile.ParseObjectInfo(reader) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + sendResult(catfileInfoResult{ + err: fmt.Errorf("parsing object info: %w", err), + }) + return + } + + if isDone := sendResult(catfileInfoResult{ + objectInfo: objectInfo, + }); isDone { + return + } + } + + if err := cmd.Wait(); err != nil { + sendResult(catfileInfoResult{ + err: fmt.Errorf("cat-file failed: %w", err), + }) + return + } + }() + + return resultChan +} + // catfileInfoFilter filters the catfileInfoResults from the provided channel with the filter // function: if the filter returns `false` for a given item, then it will be dropped from the // pipeline. Errors cannot be filtered and will always be passed through. diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go index 19b0a00bd..fc3ad6617 100644 --- a/internal/gitaly/service/blob/pipeline_test.go +++ b/internal/gitaly/service/blob/pipeline_test.go @@ -358,6 +358,39 @@ func TestCatfileInfo(t *testing.T) { } } +func TestCatfileAllObjects(t *testing.T) { + cfg := testcfg.Build(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0]) + defer cleanup() + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar")) + blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo")) + tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foobar", Mode: "100644", OID: blob1}, + }) + commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) + + resultChan := catfileInfoAllObjects(ctx, repo) + + var results []catfileInfoResult + for result := range resultChan { + require.NoError(t, result.err) + results = append(results, result) + } + + require.ElementsMatch(t, []catfileInfoResult{ + {objectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}}, + {objectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}}, + {objectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}}, + {objectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}}, + }, results) +} + func TestCatfileInfoFilter(t *testing.T) { for _, tc := range []struct { desc string |