diff options
author | James Fargher <jfargher@gitlab.com> | 2021-08-30 03:22:52 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2021-09-23 02:55:44 +0300 |
commit | baf7b1dcc301c125086159ad65eadcabe71345e0 (patch) | |
tree | db8691694be65d23e86490fb49a2b81adca14c78 | |
parent | 29cdfa053025b777d2fed84774570e769f555f35 (diff) |
Add FetchBundle RPC
FetchBundle copies the bundle stream to a tempfile that is then passed
to `git-fetch`.
Changelog: added
-rw-r--r-- | internal/gitaly/service/repository/fetch_bundle.go | 75 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_bundle_test.go | 205 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 1 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry_test.go | 1 |
4 files changed, 282 insertions, 0 deletions
diff --git a/internal/gitaly/service/repository/fetch_bundle.go b/internal/gitaly/service/repository/fetch_bundle.go new file mode 100644 index 000000000..05204d064 --- /dev/null +++ b/internal/gitaly/service/repository/fetch_bundle.go @@ -0,0 +1,75 @@ +package repository + +import ( + "io" + "os" + "path/filepath" + + gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v14/streamio" +) + +const ( + mirrorRefSpec = "+refs/*:refs/*" +) + +func (s *server) FetchBundle(stream gitalypb.RepositoryService_FetchBundleServer) error { + firstRequest, err := stream.Recv() + if err != nil { + return helper.ErrInternalf("first request: %v", err) + } + + if firstRequest.GetRepository() == nil { + return helper.ErrInvalidArgument(gitalyerrors.ErrEmptyRepository) + } + + firstRead := true + reader := streamio.NewReader(func() ([]byte, error) { + if firstRead { + firstRead = false + return firstRequest.GetData(), nil + } + + request, err := stream.Recv() + return request.GetData(), err + }) + + ctx := stream.Context() + repo := s.localrepo(firstRequest.GetRepository()) + + tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator) + if err != nil { + return helper.ErrInternal(err) + } + + bundlePath := filepath.Join(tmpDir.Path(), "repo.bundle") + file, err := os.Create(bundlePath) + if err != nil { + return helper.ErrInternal(err) + } + + _, err = io.Copy(file, reader) + if err != nil { + return helper.ErrInternalf("copy bundle: %w", err) + } + + config := []git.ConfigPair{ + {Key: "remote.inmemory.url", Value: bundlePath}, + {Key: "remote.inmemory.fetch", Value: mirrorRefSpec}, + } + opts := localrepo.FetchOpts{ + Prune: true, + CommandOptions: []git.CmdOpt{git.WithConfigEnv(config...)}, + } + + if err := repo.FetchRemote(ctx, "inmemory", opts); err != nil { + return helper.ErrInternal(err) + } + + return stream.SendAndClose(&gitalypb.FetchBundleResponse{}) +} diff --git a/internal/gitaly/service/repository/fetch_bundle_test.go b/internal/gitaly/service/repository/fetch_bundle_test.go new file mode 100644 index 000000000..a81889000 --- /dev/null +++ b/internal/gitaly/service/repository/fetch_bundle_test.go @@ -0,0 +1,205 @@ +package repository + +import ( + "context" + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gitalyhook "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestServer_FetchBundle_success(t *testing.T) { + t.Parallel() + cfg, _, repoPath, client := setupRepositoryService(t) + + tmp := testhelper.TempDir(t) + bundlePath := filepath.Join(tmp, "test.bundle") + + gittest.Exec(t, cfg, "-C", repoPath, "bundle", "create", bundlePath, "--all") + expectedRefs := gittest.Exec(t, cfg, "-C", repoPath, "show-ref", "--head") + + targetRepo, targetRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0]) + + ctx, cancel := testhelper.Context() + defer cancel() + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + request := &gitalypb.FetchBundleRequest{Repository: targetRepo} + writer := streamio.NewWriter(func(p []byte) error { + request.Data = p + + if err := stream.Send(request); err != nil { + return err + } + + request = &gitalypb.FetchBundleRequest{} + + return nil + }) + + bundle, err := os.Open(bundlePath) + require.NoError(t, err) + defer testhelper.MustClose(t, bundle) + + _, err = io.Copy(writer, bundle) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.NoError(t, err) + + refs := gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref", "--head") + require.Equal(t, string(expectedRefs), string(refs)) +} + +func TestServer_FetchBundle_transaction(t *testing.T) { + t.Parallel() + cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) + testhelper.BuildGitalyHooks(t, cfg) + + hookManager := &mockHookManager{} + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + )) + gitalypb.RegisterHookServiceServer(srv, hook.NewServer( + deps.GetCfg(), + deps.GetHookManager(), + deps.GetGitCmdFactory(), + deps.GetPackObjectsCache(), + )) + }, testserver.WithHookManager(hookManager), testserver.WithDisablePraefect()) + + client := newRepositoryClient(t, cfg, addr) + + tmp := testhelper.TempDir(t) + bundlePath := filepath.Join(tmp, "test.bundle") + gittest.BundleTestRepo(t, cfg, "gitlab-test.git", bundlePath) + + _, stopGitServer := gittest.GitServer(t, cfg, repoPath, nil) + defer func() { require.NoError(t, stopGitServer()) }() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) + ctx = metadata.IncomingToOutgoing(ctx) + + require.Empty(t, hookManager.states) + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + request := &gitalypb.FetchBundleRequest{Repository: repoProto} + writer := streamio.NewWriter(func(p []byte) error { + request.Data = p + + if err := stream.Send(request); err != nil { + return err + } + + request = &gitalypb.FetchBundleRequest{} + + return nil + }) + + bundle, err := os.Open(bundlePath) + require.NoError(t, err) + defer testhelper.MustClose(t, bundle) + + _, err = io.Copy(writer, bundle) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.NoError(t, err) + + require.Equal(t, []gitalyhook.ReferenceTransactionState{ + gitalyhook.ReferenceTransactionPrepared, + gitalyhook.ReferenceTransactionCommitted, + }, hookManager.states) +} + +func TestServer_FetchBundle_validation(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + firstRequest *gitalypb.FetchBundleRequest + expectedStreamErr string + expectedStreamErrCode codes.Code + }{ + { + desc: "no repo", + firstRequest: &gitalypb.FetchBundleRequest{ + Repository: nil, + }, + expectedStreamErr: "empty Repository", + expectedStreamErrCode: codes.InvalidArgument, + }, + { + desc: "unknown repo", + firstRequest: &gitalypb.FetchBundleRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + RelativePath: "unknown", + }, + }, + expectedStreamErr: "not a git repository", + expectedStreamErrCode: codes.NotFound, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + _, client := setupRepositoryServiceWithoutRepo(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + err = stream.Send(tc.firstRequest) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.Error(t, err) + if tc.expectedStreamErr != "" { + require.Contains(t, err.Error(), tc.expectedStreamErr) + } + if tc.expectedStreamErrCode != 0 { + require.Equal(t, tc.expectedStreamErrCode, helper.GrpcCode(err)) + } + }) + } +} + +type mockHookManager struct { + gitalyhook.Manager + states []gitalyhook.ReferenceTransactionState +} + +func (m *mockHookManager) ReferenceTransactionHook(_ context.Context, state gitalyhook.ReferenceTransactionState, _ []string, _ io.Reader) error { + m.states = append(m.states, state) + return nil +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index aea341dd0..938f47fe7 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -73,6 +73,7 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled, + "/gitaly.RepositoryService/FetchBundle": transactionsEnabled, "/gitaly.RepositoryService/FetchRemote": transactionsEnabled, "/gitaly.RepositoryService/FetchSourceBranch": transactionsEnabled, "/gitaly.RepositoryService/RemoveRepository": transactionsEnabled, diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 87ebfcd9d..515c13e3d 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -112,6 +112,7 @@ func TestNewProtoRegistry(t *testing.T) { "RepositorySize": protoregistry.OpAccessor, "ApplyGitattributes": protoregistry.OpMutator, "FetchRemote": protoregistry.OpMutator, + "FetchBundle": protoregistry.OpMutator, "CreateRepository": protoregistry.OpMutator, "GetArchive": protoregistry.OpAccessor, "HasLocalBranches": protoregistry.OpAccessor, |