Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2021-06-16 16:15:30 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2021-06-16 16:15:30 +0300
commit77d6f6e6bee63c41438ec5c186c10fa17b91fd7c (patch)
tree4614a16d52773d955ac0aeb66f94441257192c43
parentb82b2da71ab8ad5e61e4f162b99e64a7ab78ca7f (diff)
parent69e2252e25af53f7aeee1313b2308340df4b293a (diff)
Merge branch 'pks-lfs-pointers-full-pipeline' into 'master'
Convert remaining LFS RPCs to use new pipeline code See merge request gitlab-org/gitaly!3588
-rw-r--r--internal/gitaly/service/blob/lfs_pointers.go142
-rw-r--r--internal/gitaly/service/blob/lfs_pointers_test.go33
-rw-r--r--internal/gitaly/service/blob/pipeline.go70
-rw-r--r--internal/gitaly/service/blob/pipeline_test.go33
4 files changed, 226 insertions, 52 deletions
diff --git a/internal/gitaly/service/blob/lfs_pointers.go b/internal/gitaly/service/blob/lfs_pointers.go
index 466eb4d24..a16523428 100644
--- a/internal/gitaly/service/blob/lfs_pointers.go
+++ b/internal/gitaly/service/blob/lfs_pointers.go
@@ -83,28 +83,8 @@ func (s *server) ListLFSPointers(in *gitalypb.ListLFSPointersRequest, stream git
return git.IsLFSPointer(r.objectData)
})
- var i int32
- for lfsPointer := range catfileObjectChan {
- if lfsPointer.err != nil {
- return helper.ErrInternal(lfsPointer.err)
- }
-
- if err := chunker.Send(&gitalypb.LFSPointer{
- Data: lfsPointer.objectData,
- Size: lfsPointer.objectInfo.Size,
- Oid: lfsPointer.objectInfo.Oid.String(),
- }); err != nil {
- return helper.ErrInternal(fmt.Errorf("sending LFS pointer chunk: %w", err))
- }
-
- i++
- if in.Limit > 0 && i >= in.Limit {
- break
- }
- }
-
- if err := chunker.Flush(); err != nil {
- return helper.ErrInternal(err)
+ if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ return err
}
}
@@ -121,18 +101,6 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
}
repo := s.localrepo(in.GetRepository())
- cmd, err := repo.Exec(ctx, git.SubCmd{
- Name: "cat-file",
- Flags: []git.Option{
- git.Flag{Name: "--batch-all-objects"},
- git.Flag{Name: "--batch-check=%(objecttype) %(objectsize) %(objectname)"},
- git.Flag{Name: "--buffer"},
- git.Flag{Name: "--unordered"},
- },
- })
- if err != nil {
- return status.Errorf(codes.Internal, "could not run batch-check: %v", err)
- }
chunker := chunk.New(&lfsPointerSender{
send: func(pointers []*gitalypb.LFSPointer) error {
@@ -142,13 +110,46 @@ func (s *server) ListAllLFSPointers(in *gitalypb.ListAllLFSPointersRequest, stre
},
})
- filteredReader := transform.NewReader(cmd, blobFilter{
- maxSize: lfsPointerMaxSize,
- })
+ if featureflag.IsDisabled(ctx, featureflag.LFSPointersPipeline) {
+ cmd, err := repo.Exec(ctx, git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-all-objects"},
+ git.Flag{Name: "--batch-check=%(objecttype) %(objectsize) %(objectname)"},
+ git.Flag{Name: "--buffer"},
+ git.Flag{Name: "--unordered"},
+ },
+ })
+ if err != nil {
+ return status.Errorf(codes.Internal, "could not run batch-check: %v", err)
+ }
+
+ filteredReader := transform.NewReader(cmd, blobFilter{
+ maxSize: lfsPointerMaxSize,
+ })
- if err := readLFSPointers(ctx, repo, chunker, filteredReader, int(in.Limit)); err != nil {
- if !errors.Is(err, errLimitReached) {
- return status.Errorf(codes.Internal, "could not read LFS pointers: %v", err)
+ if err := readLFSPointers(ctx, repo, chunker, filteredReader, int(in.Limit)); err != nil {
+ if !errors.Is(err, errLimitReached) {
+ return status.Errorf(codes.Internal, "could not read LFS pointers: %v", err)
+ }
+ }
+ } else {
+ catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo)
+ if err != nil {
+ return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
+ }
+
+ catfileInfoChan := catfileInfoAllObjects(ctx, repo)
+ catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
+ return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
+ })
+ catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool {
+ return git.IsLFSPointer(r.objectData)
+ })
+
+ if err := sendLFSPointers(chunker, catfileObjectChan, int(in.Limit)); err != nil {
+ return err
}
}
@@ -166,7 +167,6 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
}
repo := s.localrepo(req.GetRepository())
- objectIDs := strings.Join(req.BlobIds, "\n")
chunker := chunk.New(&lfsPointerSender{
send: func(pointers []*gitalypb.LFSPointer) error {
@@ -176,8 +176,36 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
},
})
- if err := readLFSPointers(ctx, repo, chunker, strings.NewReader(objectIDs), 0); err != nil {
- if !errors.Is(err, errLimitReached) {
+ if featureflag.IsDisabled(ctx, featureflag.LFSPointersPipeline) {
+ objectIDs := strings.Join(req.BlobIds, "\n")
+
+ if err := readLFSPointers(ctx, repo, chunker, strings.NewReader(objectIDs), 0); err != nil {
+ if !errors.Is(err, errLimitReached) {
+ return err
+ }
+ }
+ } else {
+ catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo)
+ if err != nil {
+ return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
+ }
+
+ objectChan := make(chan revlistResult, len(req.GetBlobIds()))
+ for _, blobID := range req.GetBlobIds() {
+ objectChan <- revlistResult{oid: git.ObjectID(blobID)}
+ }
+ close(objectChan)
+
+ catfileInfoChan := catfileInfo(ctx, catfileProcess, objectChan)
+ catfileInfoChan = catfileInfoFilter(ctx, catfileInfoChan, func(r catfileInfoResult) bool {
+ return r.objectInfo.Type == "blob" && r.objectInfo.Size <= lfsPointerMaxSize
+ })
+ catfileObjectChan := catfileObject(ctx, catfileProcess, catfileInfoChan)
+ catfileObjectChan = catfileObjectFilter(ctx, catfileObjectChan, func(r catfileObjectResult) bool {
+ return git.IsLFSPointer(r.objectData)
+ })
+
+ if err := sendLFSPointers(chunker, catfileObjectChan, 0); err != nil {
return err
}
}
@@ -348,3 +376,31 @@ func (t *lfsPointerSender) Append(m proto.Message) {
func (t *lfsPointerSender) Send() error {
return t.send(t.pointers)
}
+
+func sendLFSPointers(chunker *chunk.Chunker, lfsPointers <-chan catfileObjectResult, limit int) error {
+ var i int
+ for lfsPointer := range lfsPointers {
+ if lfsPointer.err != nil {
+ return helper.ErrInternal(lfsPointer.err)
+ }
+
+ if err := chunker.Send(&gitalypb.LFSPointer{
+ Data: lfsPointer.objectData,
+ Size: lfsPointer.objectInfo.Size,
+ Oid: lfsPointer.objectInfo.Oid.String(),
+ }); err != nil {
+ return helper.ErrInternal(fmt.Errorf("sending LFS pointer chunk: %w", err))
+ }
+
+ i++
+ if limit > 0 && i >= limit {
+ break
+ }
+ }
+
+ if err := chunker.Flush(); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
diff --git a/internal/gitaly/service/blob/lfs_pointers_test.go b/internal/gitaly/service/blob/lfs_pointers_test.go
index 2fefa59da..879b3a183 100644
--- a/internal/gitaly/service/blob/lfs_pointers_test.go
+++ b/internal/gitaly/service/blob/lfs_pointers_test.go
@@ -187,6 +187,14 @@ func testListLFSPointers(t *testing.T, ctx context.Context) {
}
func TestListAllLFSPointers(t *testing.T) {
+ testhelper.NewFeatureSets([]featureflag.FeatureFlag{
+ featureflag.LFSPointersPipeline,
+ }).Run(t, func(t *testing.T, ctx context.Context) {
+ testListAllLFSPointers(t, ctx)
+ })
+}
+
+func testListAllLFSPointers(t *testing.T, ctx context.Context) {
receivePointers := func(t *testing.T, stream gitalypb.BlobService_ListAllLFSPointersClient) []*gitalypb.LFSPointer {
t.Helper()
@@ -202,9 +210,6 @@ func TestListAllLFSPointers(t *testing.T) {
return pointers
}
- ctx, cancel := testhelper.Context()
- defer cancel()
-
lfsPointerContents := `version https://git-lfs.github.com/spec/v1
oid sha256:1111111111111111111111111111111111111111111111111111111111111111
size 12345`
@@ -301,10 +306,15 @@ size 12345`
}
func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
- _, repo, _, client := setup(t)
+ testhelper.NewFeatureSets([]featureflag.FeatureFlag{
+ featureflag.LFSPointersPipeline,
+ }).Run(t, func(t *testing.T, ctx context.Context) {
+ testSuccessfulGetLFSPointersRequest(t, ctx)
+ })
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testSuccessfulGetLFSPointersRequest(t *testing.T, ctx context.Context) {
+ _, repo, _, client := setup(t)
lfsPointerIds := []string{
lfsPointer1,
@@ -346,10 +356,15 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
}
func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) {
- _, repo, _, client := setup(t)
+ testhelper.NewFeatureSets([]featureflag.FeatureFlag{
+ featureflag.LFSPointersPipeline,
+ }).Run(t, func(t *testing.T, ctx context.Context) {
+ testFailedGetLFSPointersRequestDueToValidations(t, ctx)
+ })
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testFailedGetLFSPointersRequestDueToValidations(t *testing.T, ctx context.Context) {
+ _, repo, _, client := setup(t)
testCases := []struct {
desc string
diff --git a/internal/gitaly/service/blob/pipeline.go b/internal/gitaly/service/blob/pipeline.go
index 0c14221bf..3be39886f 100644
--- a/internal/gitaly/service/blob/pipeline.go
+++ b/internal/gitaly/service/blob/pipeline.go
@@ -4,7 +4,9 @@ import (
"bufio"
"bytes"
"context"
+ "errors"
"fmt"
+ "io"
"io/ioutil"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
@@ -206,6 +208,74 @@ func catfileInfo(ctx context.Context, catfile catfile.Batch, revlistResultChan <
return resultChan
}
+// catfileInfoAllObjects enumerates all Git objects part of the repository's object directory and
+// extracts their object info via `git cat-file --batch-check`. The returned channel will contain
+// all processed results. Any error encountered during execution of this pipeline step will cause
+// the pipeline to fail. Context cancellation will gracefully halt the pipeline. Note that with this
+// pipeline step, the resulting catfileInfoResults will never have an object name.
+func catfileInfoAllObjects(ctx context.Context, repo *localrepo.Repo) <-chan catfileInfoResult {
+ resultChan := make(chan catfileInfoResult)
+
+ go func() {
+ defer close(resultChan)
+
+ sendResult := func(result catfileInfoResult) bool {
+ select {
+ case resultChan <- result:
+ return false
+ case <-ctx.Done():
+ return true
+ }
+ }
+
+ cmd, err := repo.Exec(ctx, git.SubCmd{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-all-objects"},
+ git.Flag{Name: "--batch-check"},
+ git.Flag{Name: "--buffer"},
+ git.Flag{Name: "--unordered"},
+ },
+ })
+ if err != nil {
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("spawning cat-file failed: %w", err),
+ })
+ return
+ }
+
+ reader := bufio.NewReader(cmd)
+ for {
+ objectInfo, err := catfile.ParseObjectInfo(reader)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("parsing object info: %w", err),
+ })
+ return
+ }
+
+ if isDone := sendResult(catfileInfoResult{
+ objectInfo: objectInfo,
+ }); isDone {
+ return
+ }
+ }
+
+ if err := cmd.Wait(); err != nil {
+ sendResult(catfileInfoResult{
+ err: fmt.Errorf("cat-file failed: %w", err),
+ })
+ return
+ }
+ }()
+
+ return resultChan
+}
+
// catfileInfoFilter filters the catfileInfoResults from the provided channel with the filter
// function: if the filter returns `false` for a given item, then it will be dropped from the
// pipeline. Errors cannot be filtered and will always be passed through.
diff --git a/internal/gitaly/service/blob/pipeline_test.go b/internal/gitaly/service/blob/pipeline_test.go
index 19b0a00bd..fc3ad6617 100644
--- a/internal/gitaly/service/blob/pipeline_test.go
+++ b/internal/gitaly/service/blob/pipeline_test.go
@@ -358,6 +358,39 @@ func TestCatfileInfo(t *testing.T) {
}
}
+func TestCatfileAllObjects(t *testing.T) {
+ cfg := testcfg.Build(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoProto, repoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
+ defer cleanup()
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ blob1 := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+ blob2 := gittest.WriteBlob(t, cfg, repoPath, []byte("barfoo"))
+ tree := gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{
+ {Path: "foobar", Mode: "100644", OID: blob1},
+ })
+ commit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
+
+ resultChan := catfileInfoAllObjects(ctx, repo)
+
+ var results []catfileInfoResult
+ for result := range resultChan {
+ require.NoError(t, result.err)
+ results = append(results, result)
+ }
+
+ require.ElementsMatch(t, []catfileInfoResult{
+ {objectInfo: &catfile.ObjectInfo{Oid: blob1, Type: "blob", Size: 6}},
+ {objectInfo: &catfile.ObjectInfo{Oid: blob2, Type: "blob", Size: 6}},
+ {objectInfo: &catfile.ObjectInfo{Oid: tree, Type: "tree", Size: 34}},
+ {objectInfo: &catfile.ObjectInfo{Oid: commit, Type: "commit", Size: 177}},
+ }, results)
+}
+
func TestCatfileInfoFilter(t *testing.T) {
for _, tc := range []struct {
desc string