diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-23 14:23:06 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-23 14:23:06 +0300 |
commit | 744461d9752f2dd22dd6f8c24b5b55df2d60ca60 (patch) | |
tree | 004f9a1fc0314c48214a3bb8d2a594f4c16908a2 /internal | |
parent | dc297bbcb3bf33b798b58b934259ce94335294c3 (diff) | |
parent | baf7b1dcc301c125086159ad65eadcabe71345e0 (diff) |
Merge branch 'fetch_bundle_rpc' into 'master'
Add FetchBundle RPC
See merge request gitlab-org/gitaly!3808
Diffstat (limited to 'internal')
-rw-r--r-- | internal/git/localrepo/remote.go | 20 | ||||
-rw-r--r-- | internal/git/localrepo/remote_extra_test.go | 17 | ||||
-rw-r--r-- | internal/git/localrepo/remote_test.go | 16 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_bundle.go | 75 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_bundle_test.go | 205 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_remote.go | 11 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 1 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry_test.go | 1 |
8 files changed, 337 insertions, 9 deletions
diff --git a/internal/git/localrepo/remote.go b/internal/git/localrepo/remote.go index 949bcd26f..a490473a9 100644 --- a/internal/git/localrepo/remote.go +++ b/internal/git/localrepo/remote.go @@ -217,6 +217,8 @@ type FetchOpts struct { Tags FetchOptsTags // Stderr if set it would be used to redirect stderr stream into it. Stderr io.Writer + // DisableTransactions will disable the reference-transaction hook and atomic transactions. + DisableTransactions bool } // ErrFetchFailed indicates that the fetch has failed. @@ -244,7 +246,11 @@ func (repo *Repo) FetchRemote(ctx context.Context, remoteName string, opts Fetch commandOptions := []git.CmdOpt{ git.WithEnv(opts.Env...), git.WithStderr(opts.Stderr), - git.WithDisabledHooks(), + } + if opts.DisableTransactions { + commandOptions = append(commandOptions, git.WithDisabledHooks()) + } else { + commandOptions = append(commandOptions, git.WithRefTxHook(ctx, repo, repo.cfg)) } commandOptions = append(commandOptions, opts.CommandOptions...) @@ -295,7 +301,6 @@ func (repo *Repo) FetchInternal( commandOptions := []git.CmdOpt{ git.WithEnv(append(env, opts.Env...)...), git.WithStderr(opts.Stderr), - git.WithRefTxHook(ctx, repo, repo.cfg), // We've observed performance issues when fetching into big repositories part of an // object pool. The root cause of this seems to be the connectivity check, which by // default will also include references of any alternates. Given that object pools @@ -305,12 +310,17 @@ func (repo *Repo) FetchInternal( // matter in the connectivity check either. git.WithConfig(git.ConfigPair{Key: "core.alternateRefsCommand", Value: "exit 0 #"}), } + if opts.DisableTransactions { + commandOptions = append(commandOptions, git.WithDisabledHooks()) + } else { + commandOptions = append(commandOptions, git.WithRefTxHook(ctx, repo, repo.cfg)) + } commandOptions = append(commandOptions, opts.CommandOptions...) if err := repo.ExecAndWait(ctx, git.SubCmd{ Name: "fetch", - Flags: append(opts.buildFlags(), git.Flag{Name: "--atomic"}), + Flags: opts.buildFlags(), Args: append([]string{gitalyssh.GitalyInternalURL}, refspecs...), }, commandOptions..., @@ -340,6 +350,10 @@ func (opts FetchOpts) buildFlags() []git.Option { flags = append(flags, git.Flag{Name: opts.Tags.String()}) } + if !opts.DisableTransactions { + flags = append(flags, git.Flag{Name: "--atomic"}) + } + return flags } diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go index 8dc1a3cc5..fe2027d24 100644 --- a/internal/git/localrepo/remote_extra_test.go +++ b/internal/git/localrepo/remote_extra_test.go @@ -147,7 +147,22 @@ func TestRepo_FetchInternal(t *testing.T) { localrepo.FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}}, ) require.NoError(t, err) - require.Contains(t, stderr.String(), "trace: built-in: git fetch") + require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --atomic --end-of-options") + }) + + t.Run("with disabled transactions", func(t *testing.T) { + ctx := testhelper.MergeIncomingMetadata(ctx, testhelper.GitalyServersMetadataFromCfg(t, cfg)) + + repoProto, _ := gittest.InitRepo(t, cfg, cfg.Storages[0]) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + var stderr bytes.Buffer + err := repo.FetchInternal( + ctx, remoteRepoProto, []string{"refs/heads/master"}, + localrepo.FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}, DisableTransactions: true}, + ) + require.NoError(t, err) + require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --end-of-options") }) t.Run("invalid remote repo", func(t *testing.T) { diff --git a/internal/git/localrepo/remote_test.go b/internal/git/localrepo/remote_test.go index 4f1a8276a..6b165f0c9 100644 --- a/internal/git/localrepo/remote_test.go +++ b/internal/git/localrepo/remote_test.go @@ -361,6 +361,22 @@ func TestRepo_FetchRemote(t *testing.T) { var stderr bytes.Buffer require.NoError(t, repo.FetchRemote(ctx, "source", FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}})) + require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --atomic --end-of-options source") + }) + + t.Run("with disabled transactions", func(t *testing.T) { + _, sourceRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + testRepo, testRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) + + repo := New(remoteCmd.repo.gitCmdFactory, remoteCmd.repo.catfileCache, testRepo, cfg) + gittest.Exec(t, cfg, "-C", testRepoPath, "remote", "add", "source", sourceRepoPath) + + var stderr bytes.Buffer + require.NoError(t, repo.FetchRemote(ctx, "source", FetchOpts{ + Stderr: &stderr, + Env: []string{"GIT_TRACE=1"}, + DisableTransactions: true, + })) require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --end-of-options source") }) diff --git a/internal/gitaly/service/repository/fetch_bundle.go b/internal/gitaly/service/repository/fetch_bundle.go new file mode 100644 index 000000000..05204d064 --- /dev/null +++ b/internal/gitaly/service/repository/fetch_bundle.go @@ -0,0 +1,75 @@ +package repository + +import ( + "io" + "os" + "path/filepath" + + gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v14/streamio" +) + +const ( + mirrorRefSpec = "+refs/*:refs/*" +) + +func (s *server) FetchBundle(stream gitalypb.RepositoryService_FetchBundleServer) error { + firstRequest, err := stream.Recv() + if err != nil { + return helper.ErrInternalf("first request: %v", err) + } + + if firstRequest.GetRepository() == nil { + return helper.ErrInvalidArgument(gitalyerrors.ErrEmptyRepository) + } + + firstRead := true + reader := streamio.NewReader(func() ([]byte, error) { + if firstRead { + firstRead = false + return firstRequest.GetData(), nil + } + + request, err := stream.Recv() + return request.GetData(), err + }) + + ctx := stream.Context() + repo := s.localrepo(firstRequest.GetRepository()) + + tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator) + if err != nil { + return helper.ErrInternal(err) + } + + bundlePath := filepath.Join(tmpDir.Path(), "repo.bundle") + file, err := os.Create(bundlePath) + if err != nil { + return helper.ErrInternal(err) + } + + _, err = io.Copy(file, reader) + if err != nil { + return helper.ErrInternalf("copy bundle: %w", err) + } + + config := []git.ConfigPair{ + {Key: "remote.inmemory.url", Value: bundlePath}, + {Key: "remote.inmemory.fetch", Value: mirrorRefSpec}, + } + opts := localrepo.FetchOpts{ + Prune: true, + CommandOptions: []git.CmdOpt{git.WithConfigEnv(config...)}, + } + + if err := repo.FetchRemote(ctx, "inmemory", opts); err != nil { + return helper.ErrInternal(err) + } + + return stream.SendAndClose(&gitalypb.FetchBundleResponse{}) +} diff --git a/internal/gitaly/service/repository/fetch_bundle_test.go b/internal/gitaly/service/repository/fetch_bundle_test.go new file mode 100644 index 000000000..a81889000 --- /dev/null +++ b/internal/gitaly/service/repository/fetch_bundle_test.go @@ -0,0 +1,205 @@ +package repository + +import ( + "context" + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gitalyhook "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func TestServer_FetchBundle_success(t *testing.T) { + t.Parallel() + cfg, _, repoPath, client := setupRepositoryService(t) + + tmp := testhelper.TempDir(t) + bundlePath := filepath.Join(tmp, "test.bundle") + + gittest.Exec(t, cfg, "-C", repoPath, "bundle", "create", bundlePath, "--all") + expectedRefs := gittest.Exec(t, cfg, "-C", repoPath, "show-ref", "--head") + + targetRepo, targetRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0]) + + ctx, cancel := testhelper.Context() + defer cancel() + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + request := &gitalypb.FetchBundleRequest{Repository: targetRepo} + writer := streamio.NewWriter(func(p []byte) error { + request.Data = p + + if err := stream.Send(request); err != nil { + return err + } + + request = &gitalypb.FetchBundleRequest{} + + return nil + }) + + bundle, err := os.Open(bundlePath) + require.NoError(t, err) + defer testhelper.MustClose(t, bundle) + + _, err = io.Copy(writer, bundle) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.NoError(t, err) + + refs := gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref", "--head") + require.Equal(t, string(expectedRefs), string(refs)) +} + +func TestServer_FetchBundle_transaction(t *testing.T) { + t.Parallel() + cfg, repoProto, repoPath := testcfg.BuildWithRepo(t) + testhelper.BuildGitalyHooks(t, cfg) + + hookManager := &mockHookManager{} + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + )) + gitalypb.RegisterHookServiceServer(srv, hook.NewServer( + deps.GetCfg(), + deps.GetHookManager(), + deps.GetGitCmdFactory(), + deps.GetPackObjectsCache(), + )) + }, testserver.WithHookManager(hookManager), testserver.WithDisablePraefect()) + + client := newRepositoryClient(t, cfg, addr) + + tmp := testhelper.TempDir(t) + bundlePath := filepath.Join(tmp, "test.bundle") + gittest.BundleTestRepo(t, cfg, "gitlab-test.git", bundlePath) + + _, stopGitServer := gittest.GitServer(t, cfg, repoPath, nil) + defer func() { require.NoError(t, stopGitServer()) }() + + ctx, cancel := testhelper.Context() + defer cancel() + ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) + ctx = metadata.IncomingToOutgoing(ctx) + + require.Empty(t, hookManager.states) + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + request := &gitalypb.FetchBundleRequest{Repository: repoProto} + writer := streamio.NewWriter(func(p []byte) error { + request.Data = p + + if err := stream.Send(request); err != nil { + return err + } + + request = &gitalypb.FetchBundleRequest{} + + return nil + }) + + bundle, err := os.Open(bundlePath) + require.NoError(t, err) + defer testhelper.MustClose(t, bundle) + + _, err = io.Copy(writer, bundle) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.NoError(t, err) + + require.Equal(t, []gitalyhook.ReferenceTransactionState{ + gitalyhook.ReferenceTransactionPrepared, + gitalyhook.ReferenceTransactionCommitted, + }, hookManager.states) +} + +func TestServer_FetchBundle_validation(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + firstRequest *gitalypb.FetchBundleRequest + expectedStreamErr string + expectedStreamErrCode codes.Code + }{ + { + desc: "no repo", + firstRequest: &gitalypb.FetchBundleRequest{ + Repository: nil, + }, + expectedStreamErr: "empty Repository", + expectedStreamErrCode: codes.InvalidArgument, + }, + { + desc: "unknown repo", + firstRequest: &gitalypb.FetchBundleRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + RelativePath: "unknown", + }, + }, + expectedStreamErr: "not a git repository", + expectedStreamErrCode: codes.NotFound, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + _, client := setupRepositoryServiceWithoutRepo(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + stream, err := client.FetchBundle(ctx) + require.NoError(t, err) + + err = stream.Send(tc.firstRequest) + require.NoError(t, err) + + _, err = stream.CloseAndRecv() + require.Error(t, err) + if tc.expectedStreamErr != "" { + require.Contains(t, err.Error(), tc.expectedStreamErr) + } + if tc.expectedStreamErrCode != 0 { + require.Equal(t, tc.expectedStreamErrCode, helper.GrpcCode(err)) + } + }) + } +} + +type mockHookManager struct { + gitalyhook.Manager + states []gitalyhook.ReferenceTransactionState +} + +func (m *mockHookManager) ReferenceTransactionHook(_ context.Context, state gitalyhook.ReferenceTransactionState, _ []string, _ io.Reader) error { + m.states = append(m.states, state) + return nil +} diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go index 34a9c90fa..0a18c9140 100644 --- a/internal/gitaly/service/repository/fetch_remote.go +++ b/internal/gitaly/service/repository/fetch_remote.go @@ -26,11 +26,12 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque var stderr bytes.Buffer opts := localrepo.FetchOpts{ - Stderr: &stderr, - Force: req.Force, - Prune: !req.NoPrune, - Tags: localrepo.FetchOptsTagsAll, - Verbose: req.GetCheckTagsChanged(), + Stderr: &stderr, + Force: req.Force, + Prune: !req.NoPrune, + Tags: localrepo.FetchOptsTagsAll, + Verbose: req.GetCheckTagsChanged(), + DisableTransactions: true, } if req.GetNoTags() { diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index aea341dd0..938f47fe7 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -73,6 +73,7 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled, "/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled, + "/gitaly.RepositoryService/FetchBundle": transactionsEnabled, "/gitaly.RepositoryService/FetchRemote": transactionsEnabled, "/gitaly.RepositoryService/FetchSourceBranch": transactionsEnabled, "/gitaly.RepositoryService/RemoveRepository": transactionsEnabled, diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 87ebfcd9d..515c13e3d 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -112,6 +112,7 @@ func TestNewProtoRegistry(t *testing.T) { "RepositorySize": protoregistry.OpAccessor, "ApplyGitattributes": protoregistry.OpMutator, "FetchRemote": protoregistry.OpMutator, + "FetchBundle": protoregistry.OpMutator, "CreateRepository": protoregistry.OpMutator, "GetArchive": protoregistry.OpAccessor, "HasLocalBranches": protoregistry.OpAccessor, |