diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-23 16:38:32 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-26 13:35:41 +0300 |
commit | 598fb165eff266513b9788ebcdc6aa900a65f083 (patch) | |
tree | eaad0b3ffb931fe530d246743a587a2ebce3525c | |
parent | f6ded298a67db9e024fa16942af708a5f48362b1 (diff) |
blob: Implement new ListAllBlobs RPC
Implement a new ListAllBlobs RPC, which is the equivalent of
ListAllLFSPointers and ListAllCommits but for blobs: given a repository,
it will return all blobs no matter whether they're reachable or not.
This will eventually be used to efficiently enumerate all new blobs in
Rails' access checks.
Changelog: feature
-rw-r--r-- | internal/gitaly/service/blob/blobs.go | 65 | ||||
-rw-r--r-- | internal/gitaly/service/blob/blobs_test.go | 143 |
2 files changed, 208 insertions, 0 deletions
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go index 4f5c89300..9acb4cd8c 100644 --- a/internal/gitaly/service/blob/blobs.go +++ b/internal/gitaly/service/blob/blobs.go @@ -203,3 +203,68 @@ func (t *blobSender) Append(m proto.Message) { func (t *blobSender) Send() error { return t.send(t.blobs) } + +// ListAllBlobs finds all blobs which exist in the repository, including those which are not +// reachable via graph walks. +func (s *server) ListAllBlobs(req *gitalypb.ListAllBlobsRequest, stream gitalypb.BlobService_ListAllBlobsServer) error { + ctx := stream.Context() + + if req.GetRepository() == nil { + return helper.ErrInvalidArgumentf("empty repository") + } + + repo := s.localrepo(req.GetRepository()) + + chunker := chunk.New(&allBlobsSender{ + send: func(blobs []*gitalypb.ListAllBlobsResponse_Blob) error { + return stream.Send(&gitalypb.ListAllBlobsResponse{ + Blobs: blobs, + }) + }, + }) + + catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo) + if err != nil { + return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err)) + } + + catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo) + catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool { + return r.ObjectInfo.Type == "blob" + }) + + if err := processBlobs(ctx, catfileProcess, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(), + func(oid string, size int64, contents []byte) error { + return chunker.Send(&gitalypb.ListAllBlobsResponse_Blob{ + Oid: oid, + Size: size, + Data: contents, + }) + }, + ); err != nil { + return helper.ErrInternal(fmt.Errorf("processing blobs: %w", err)) + } + + if err := chunker.Flush(); err != nil { + return helper.ErrInternal(fmt.Errorf("flushing blobs: %w", err)) + } + + return nil +} + +type allBlobsSender struct { + blobs []*gitalypb.ListAllBlobsResponse_Blob + send func([]*gitalypb.ListAllBlobsResponse_Blob) error +} + +func (t *allBlobsSender) Reset() { + t.blobs = t.blobs[:0] +} + +func (t *allBlobsSender) Append(m proto.Message) { + t.blobs = append(t.blobs, m.(*gitalypb.ListAllBlobsResponse_Blob)) +} + +func (t *allBlobsSender) Send() error { + return t.send(t.blobs) +} diff --git a/internal/gitaly/service/blob/blobs_test.go b/internal/gitaly/service/blob/blobs_test.go index d92a9fc21..4afa10b1b 100644 --- a/internal/gitaly/service/blob/blobs_test.go +++ b/internal/gitaly/service/blob/blobs_test.go @@ -8,12 +8,15 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/quarantine" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) const ( @@ -242,3 +245,143 @@ func TestListBlobs(t *testing.T) { }) } } + +func TestListAllBlobs(t *testing.T) { + cfg, repo, _, client := setup(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + quarantine, err := quarantine.New(ctx, repo, config.NewLocator(cfg)) + require.NoError(t, err) + + quarantineRepoWithoutAlternates := proto.Clone(quarantine.QuarantinedRepo()).(*gitalypb.Repository) + quarantineRepoWithoutAlternates.GitAlternateObjectDirectories = []string{} + + emptyRepo, _, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0]) + defer cleanup() + + singleBlobRepo, singleBlobRepoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0]) + defer cleanup() + blobID := gittest.WriteBlob(t, cfg, singleBlobRepoPath, []byte("foobar")) + + for _, tc := range []struct { + desc string + request *gitalypb.ListAllBlobsRequest + verify func(*testing.T, []*gitalypb.ListAllBlobsResponse_Blob) + }{ + { + desc: "empty repo", + request: &gitalypb.ListAllBlobsRequest{ + Repository: emptyRepo, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Empty(t, blobs) + }, + }, + { + desc: "repo with single blob", + request: &gitalypb.ListAllBlobsRequest{ + Repository: singleBlobRepo, + BytesLimit: -1, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Equal(t, []*gitalypb.ListAllBlobsResponse_Blob{{ + Oid: blobID.String(), + Size: 6, + Data: []byte("foobar"), + }}, blobs) + }, + }, + { + desc: "repo with single blob and bytes limit", + request: &gitalypb.ListAllBlobsRequest{ + Repository: singleBlobRepo, + BytesLimit: 1, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Equal(t, []*gitalypb.ListAllBlobsResponse_Blob{{ + Oid: blobID.String(), + Size: 6, + Data: []byte("f"), + }}, blobs) + }, + }, + { + desc: "normal repo", + request: &gitalypb.ListAllBlobsRequest{ + Repository: repo, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Greater(t, len(blobs), 300) + }, + }, + { + desc: "normal repo with limit", + request: &gitalypb.ListAllBlobsRequest{ + Repository: repo, + Limit: 2, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Len(t, blobs, 2) + }, + }, + { + desc: "normal repo with bytes limit", + request: &gitalypb.ListAllBlobsRequest{ + Repository: repo, + BytesLimit: 1, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Greater(t, len(blobs), 300) + for _, blob := range blobs { + emptyBlobID := "e69de29bb2d1d6434b8b29ae775ad8c2e48c5391" + if blob.Oid == emptyBlobID { + require.Empty(t, blob.Data) + require.Equal(t, int64(0), blob.Size) + } else { + require.Len(t, blob.Data, 1) + } + } + }, + }, + { + desc: "quarantine repo with alternates", + request: &gitalypb.ListAllBlobsRequest{ + Repository: quarantine.QuarantinedRepo(), + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Greater(t, len(blobs), 300) + }, + }, + { + desc: "quarantine repo without alternates", + request: &gitalypb.ListAllBlobsRequest{ + Repository: quarantineRepoWithoutAlternates, + }, + verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) { + require.Empty(t, blobs) + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + stream, err := client.ListAllBlobs(ctx, tc.request) + require.NoError(t, err) + + var blobs []*gitalypb.ListAllBlobsResponse_Blob + for { + resp, err := stream.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + require.NoError(t, err) + } + break + } + + blobs = append(blobs, resp.Blobs...) + } + + tc.verify(t, blobs) + }) + } +} |