Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-23 16:38:32 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-26 13:35:41 +0300
commit598fb165eff266513b9788ebcdc6aa900a65f083 (patch)
treeeaad0b3ffb931fe530d246743a587a2ebce3525c
parentf6ded298a67db9e024fa16942af708a5f48362b1 (diff)
blob: Implement new ListAllBlobs RPC
Implement a new ListAllBlobs RPC, which is the equivalent of ListAllLFSPointers and ListAllCommits but for blobs: given a repository, it will return all blobs no matter whether they're reachable or not. This will eventually be used to efficiently enumerate all new blobs in Rails' access checks. Changelog: feature
-rw-r--r--internal/gitaly/service/blob/blobs.go65
-rw-r--r--internal/gitaly/service/blob/blobs_test.go143
2 files changed, 208 insertions, 0 deletions
diff --git a/internal/gitaly/service/blob/blobs.go b/internal/gitaly/service/blob/blobs.go
index 4f5c89300..9acb4cd8c 100644
--- a/internal/gitaly/service/blob/blobs.go
+++ b/internal/gitaly/service/blob/blobs.go
@@ -203,3 +203,68 @@ func (t *blobSender) Append(m proto.Message) {
func (t *blobSender) Send() error {
return t.send(t.blobs)
}
+
+// ListAllBlobs finds all blobs which exist in the repository, including those which are not
+// reachable via graph walks.
+func (s *server) ListAllBlobs(req *gitalypb.ListAllBlobsRequest, stream gitalypb.BlobService_ListAllBlobsServer) error {
+ ctx := stream.Context()
+
+ if req.GetRepository() == nil {
+ return helper.ErrInvalidArgumentf("empty repository")
+ }
+
+ repo := s.localrepo(req.GetRepository())
+
+ chunker := chunk.New(&allBlobsSender{
+ send: func(blobs []*gitalypb.ListAllBlobsResponse_Blob) error {
+ return stream.Send(&gitalypb.ListAllBlobsResponse{
+ Blobs: blobs,
+ })
+ },
+ })
+
+ catfileProcess, err := s.catfileCache.BatchProcess(ctx, repo)
+ if err != nil {
+ return helper.ErrInternal(fmt.Errorf("creating catfile process: %w", err))
+ }
+
+ catfileInfoIter := gitpipe.CatfileInfoAllObjects(ctx, repo)
+ catfileInfoIter = gitpipe.CatfileInfoFilter(ctx, catfileInfoIter, func(r gitpipe.CatfileInfoResult) bool {
+ return r.ObjectInfo.Type == "blob"
+ })
+
+ if err := processBlobs(ctx, catfileProcess, catfileInfoIter, req.GetLimit(), req.GetBytesLimit(),
+ func(oid string, size int64, contents []byte) error {
+ return chunker.Send(&gitalypb.ListAllBlobsResponse_Blob{
+ Oid: oid,
+ Size: size,
+ Data: contents,
+ })
+ },
+ ); err != nil {
+ return helper.ErrInternal(fmt.Errorf("processing blobs: %w", err))
+ }
+
+ if err := chunker.Flush(); err != nil {
+ return helper.ErrInternal(fmt.Errorf("flushing blobs: %w", err))
+ }
+
+ return nil
+}
+
+type allBlobsSender struct {
+ blobs []*gitalypb.ListAllBlobsResponse_Blob
+ send func([]*gitalypb.ListAllBlobsResponse_Blob) error
+}
+
+func (t *allBlobsSender) Reset() {
+ t.blobs = t.blobs[:0]
+}
+
+func (t *allBlobsSender) Append(m proto.Message) {
+ t.blobs = append(t.blobs, m.(*gitalypb.ListAllBlobsResponse_Blob))
+}
+
+func (t *allBlobsSender) Send() error {
+ return t.send(t.blobs)
+}
diff --git a/internal/gitaly/service/blob/blobs_test.go b/internal/gitaly/service/blob/blobs_test.go
index d92a9fc21..4afa10b1b 100644
--- a/internal/gitaly/service/blob/blobs_test.go
+++ b/internal/gitaly/service/blob/blobs_test.go
@@ -8,12 +8,15 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/quarantine"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -242,3 +245,143 @@ func TestListBlobs(t *testing.T) {
})
}
}
+
+func TestListAllBlobs(t *testing.T) {
+ cfg, repo, _, client := setup(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ quarantine, err := quarantine.New(ctx, repo, config.NewLocator(cfg))
+ require.NoError(t, err)
+
+ quarantineRepoWithoutAlternates := proto.Clone(quarantine.QuarantinedRepo()).(*gitalypb.Repository)
+ quarantineRepoWithoutAlternates.GitAlternateObjectDirectories = []string{}
+
+ emptyRepo, _, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
+ defer cleanup()
+
+ singleBlobRepo, singleBlobRepoPath, cleanup := gittest.InitBareRepoAt(t, cfg, cfg.Storages[0])
+ defer cleanup()
+ blobID := gittest.WriteBlob(t, cfg, singleBlobRepoPath, []byte("foobar"))
+
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.ListAllBlobsRequest
+ verify func(*testing.T, []*gitalypb.ListAllBlobsResponse_Blob)
+ }{
+ {
+ desc: "empty repo",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: emptyRepo,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Empty(t, blobs)
+ },
+ },
+ {
+ desc: "repo with single blob",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: singleBlobRepo,
+ BytesLimit: -1,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Equal(t, []*gitalypb.ListAllBlobsResponse_Blob{{
+ Oid: blobID.String(),
+ Size: 6,
+ Data: []byte("foobar"),
+ }}, blobs)
+ },
+ },
+ {
+ desc: "repo with single blob and bytes limit",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: singleBlobRepo,
+ BytesLimit: 1,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Equal(t, []*gitalypb.ListAllBlobsResponse_Blob{{
+ Oid: blobID.String(),
+ Size: 6,
+ Data: []byte("f"),
+ }}, blobs)
+ },
+ },
+ {
+ desc: "normal repo",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: repo,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Greater(t, len(blobs), 300)
+ },
+ },
+ {
+ desc: "normal repo with limit",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: repo,
+ Limit: 2,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Len(t, blobs, 2)
+ },
+ },
+ {
+ desc: "normal repo with bytes limit",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: repo,
+ BytesLimit: 1,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Greater(t, len(blobs), 300)
+ for _, blob := range blobs {
+ emptyBlobID := "e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"
+ if blob.Oid == emptyBlobID {
+ require.Empty(t, blob.Data)
+ require.Equal(t, int64(0), blob.Size)
+ } else {
+ require.Len(t, blob.Data, 1)
+ }
+ }
+ },
+ },
+ {
+ desc: "quarantine repo with alternates",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: quarantine.QuarantinedRepo(),
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Greater(t, len(blobs), 300)
+ },
+ },
+ {
+ desc: "quarantine repo without alternates",
+ request: &gitalypb.ListAllBlobsRequest{
+ Repository: quarantineRepoWithoutAlternates,
+ },
+ verify: func(t *testing.T, blobs []*gitalypb.ListAllBlobsResponse_Blob) {
+ require.Empty(t, blobs)
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ stream, err := client.ListAllBlobs(ctx, tc.request)
+ require.NoError(t, err)
+
+ var blobs []*gitalypb.ListAllBlobsResponse_Blob
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ require.NoError(t, err)
+ }
+ break
+ }
+
+ blobs = append(blobs, resp.Blobs...)
+ }
+
+ tc.verify(t, blobs)
+ })
+ }
+}