Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-23 14:23:06 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-23 14:23:06 +0300
commit744461d9752f2dd22dd6f8c24b5b55df2d60ca60 (patch)
tree004f9a1fc0314c48214a3bb8d2a594f4c16908a2 /internal
parentdc297bbcb3bf33b798b58b934259ce94335294c3 (diff)
parentbaf7b1dcc301c125086159ad65eadcabe71345e0 (diff)
Merge branch 'fetch_bundle_rpc' into 'master'
Add FetchBundle RPC See merge request gitlab-org/gitaly!3808
Diffstat (limited to 'internal')
-rw-r--r--internal/git/localrepo/remote.go20
-rw-r--r--internal/git/localrepo/remote_extra_test.go17
-rw-r--r--internal/git/localrepo/remote_test.go16
-rw-r--r--internal/gitaly/service/repository/fetch_bundle.go75
-rw-r--r--internal/gitaly/service/repository/fetch_bundle_test.go205
-rw-r--r--internal/gitaly/service/repository/fetch_remote.go11
-rw-r--r--internal/praefect/coordinator.go1
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go1
8 files changed, 337 insertions, 9 deletions
diff --git a/internal/git/localrepo/remote.go b/internal/git/localrepo/remote.go
index 949bcd26f..a490473a9 100644
--- a/internal/git/localrepo/remote.go
+++ b/internal/git/localrepo/remote.go
@@ -217,6 +217,8 @@ type FetchOpts struct {
Tags FetchOptsTags
// Stderr if set it would be used to redirect stderr stream into it.
Stderr io.Writer
+ // DisableTransactions will disable the reference-transaction hook and atomic transactions.
+ DisableTransactions bool
}
// ErrFetchFailed indicates that the fetch has failed.
@@ -244,7 +246,11 @@ func (repo *Repo) FetchRemote(ctx context.Context, remoteName string, opts Fetch
commandOptions := []git.CmdOpt{
git.WithEnv(opts.Env...),
git.WithStderr(opts.Stderr),
- git.WithDisabledHooks(),
+ }
+ if opts.DisableTransactions {
+ commandOptions = append(commandOptions, git.WithDisabledHooks())
+ } else {
+ commandOptions = append(commandOptions, git.WithRefTxHook(ctx, repo, repo.cfg))
}
commandOptions = append(commandOptions, opts.CommandOptions...)
@@ -295,7 +301,6 @@ func (repo *Repo) FetchInternal(
commandOptions := []git.CmdOpt{
git.WithEnv(append(env, opts.Env...)...),
git.WithStderr(opts.Stderr),
- git.WithRefTxHook(ctx, repo, repo.cfg),
// We've observed performance issues when fetching into big repositories part of an
// object pool. The root cause of this seems to be the connectivity check, which by
// default will also include references of any alternates. Given that object pools
@@ -305,12 +310,17 @@ func (repo *Repo) FetchInternal(
// matter in the connectivity check either.
git.WithConfig(git.ConfigPair{Key: "core.alternateRefsCommand", Value: "exit 0 #"}),
}
+ if opts.DisableTransactions {
+ commandOptions = append(commandOptions, git.WithDisabledHooks())
+ } else {
+ commandOptions = append(commandOptions, git.WithRefTxHook(ctx, repo, repo.cfg))
+ }
commandOptions = append(commandOptions, opts.CommandOptions...)
if err := repo.ExecAndWait(ctx,
git.SubCmd{
Name: "fetch",
- Flags: append(opts.buildFlags(), git.Flag{Name: "--atomic"}),
+ Flags: opts.buildFlags(),
Args: append([]string{gitalyssh.GitalyInternalURL}, refspecs...),
},
commandOptions...,
@@ -340,6 +350,10 @@ func (opts FetchOpts) buildFlags() []git.Option {
flags = append(flags, git.Flag{Name: opts.Tags.String()})
}
+ if !opts.DisableTransactions {
+ flags = append(flags, git.Flag{Name: "--atomic"})
+ }
+
return flags
}
diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go
index 8dc1a3cc5..fe2027d24 100644
--- a/internal/git/localrepo/remote_extra_test.go
+++ b/internal/git/localrepo/remote_extra_test.go
@@ -147,7 +147,22 @@ func TestRepo_FetchInternal(t *testing.T) {
localrepo.FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}},
)
require.NoError(t, err)
- require.Contains(t, stderr.String(), "trace: built-in: git fetch")
+ require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --atomic --end-of-options")
+ })
+
+ t.Run("with disabled transactions", func(t *testing.T) {
+ ctx := testhelper.MergeIncomingMetadata(ctx, testhelper.GitalyServersMetadataFromCfg(t, cfg))
+
+ repoProto, _ := gittest.InitRepo(t, cfg, cfg.Storages[0])
+ repo := localrepo.NewTestRepo(t, cfg, repoProto)
+
+ var stderr bytes.Buffer
+ err := repo.FetchInternal(
+ ctx, remoteRepoProto, []string{"refs/heads/master"},
+ localrepo.FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}, DisableTransactions: true},
+ )
+ require.NoError(t, err)
+ require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --end-of-options")
})
t.Run("invalid remote repo", func(t *testing.T) {
diff --git a/internal/git/localrepo/remote_test.go b/internal/git/localrepo/remote_test.go
index 4f1a8276a..6b165f0c9 100644
--- a/internal/git/localrepo/remote_test.go
+++ b/internal/git/localrepo/remote_test.go
@@ -361,6 +361,22 @@ func TestRepo_FetchRemote(t *testing.T) {
var stderr bytes.Buffer
require.NoError(t, repo.FetchRemote(ctx, "source", FetchOpts{Stderr: &stderr, Env: []string{"GIT_TRACE=1"}}))
+ require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --atomic --end-of-options source")
+ })
+
+ t.Run("with disabled transactions", func(t *testing.T) {
+ _, sourceRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+ testRepo, testRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
+
+ repo := New(remoteCmd.repo.gitCmdFactory, remoteCmd.repo.catfileCache, testRepo, cfg)
+ gittest.Exec(t, cfg, "-C", testRepoPath, "remote", "add", "source", sourceRepoPath)
+
+ var stderr bytes.Buffer
+ require.NoError(t, repo.FetchRemote(ctx, "source", FetchOpts{
+ Stderr: &stderr,
+ Env: []string{"GIT_TRACE=1"},
+ DisableTransactions: true,
+ }))
require.Contains(t, stderr.String(), "trace: built-in: git fetch --quiet --end-of-options source")
})
diff --git a/internal/gitaly/service/repository/fetch_bundle.go b/internal/gitaly/service/repository/fetch_bundle.go
new file mode 100644
index 000000000..05204d064
--- /dev/null
+++ b/internal/gitaly/service/repository/fetch_bundle.go
@@ -0,0 +1,75 @@
+package repository
+
+import (
+ "io"
+ "os"
+ "path/filepath"
+
+ gitalyerrors "gitlab.com/gitlab-org/gitaly/v14/internal/errors"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v14/streamio"
+)
+
+const (
+ mirrorRefSpec = "+refs/*:refs/*"
+)
+
+func (s *server) FetchBundle(stream gitalypb.RepositoryService_FetchBundleServer) error {
+ firstRequest, err := stream.Recv()
+ if err != nil {
+ return helper.ErrInternalf("first request: %v", err)
+ }
+
+ if firstRequest.GetRepository() == nil {
+ return helper.ErrInvalidArgument(gitalyerrors.ErrEmptyRepository)
+ }
+
+ firstRead := true
+ reader := streamio.NewReader(func() ([]byte, error) {
+ if firstRead {
+ firstRead = false
+ return firstRequest.GetData(), nil
+ }
+
+ request, err := stream.Recv()
+ return request.GetData(), err
+ })
+
+ ctx := stream.Context()
+ repo := s.localrepo(firstRequest.GetRepository())
+
+ tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ bundlePath := filepath.Join(tmpDir.Path(), "repo.bundle")
+ file, err := os.Create(bundlePath)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ _, err = io.Copy(file, reader)
+ if err != nil {
+ return helper.ErrInternalf("copy bundle: %w", err)
+ }
+
+ config := []git.ConfigPair{
+ {Key: "remote.inmemory.url", Value: bundlePath},
+ {Key: "remote.inmemory.fetch", Value: mirrorRefSpec},
+ }
+ opts := localrepo.FetchOpts{
+ Prune: true,
+ CommandOptions: []git.CmdOpt{git.WithConfigEnv(config...)},
+ }
+
+ if err := repo.FetchRemote(ctx, "inmemory", opts); err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ return stream.SendAndClose(&gitalypb.FetchBundleResponse{})
+}
diff --git a/internal/gitaly/service/repository/fetch_bundle_test.go b/internal/gitaly/service/repository/fetch_bundle_test.go
new file mode 100644
index 000000000..a81889000
--- /dev/null
+++ b/internal/gitaly/service/repository/fetch_bundle_test.go
@@ -0,0 +1,205 @@
+package repository
+
+import (
+ "context"
+ "io"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gitalyhook "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/hook"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/v14/streamio"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+)
+
+func TestServer_FetchBundle_success(t *testing.T) {
+ t.Parallel()
+ cfg, _, repoPath, client := setupRepositoryService(t)
+
+ tmp := testhelper.TempDir(t)
+ bundlePath := filepath.Join(tmp, "test.bundle")
+
+ gittest.Exec(t, cfg, "-C", repoPath, "bundle", "create", bundlePath, "--all")
+ expectedRefs := gittest.Exec(t, cfg, "-C", repoPath, "show-ref", "--head")
+
+ targetRepo, targetRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ request := &gitalypb.FetchBundleRequest{Repository: targetRepo}
+ writer := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+
+ bundle, err := os.Open(bundlePath)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, bundle)
+
+ _, err = io.Copy(writer, bundle)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.NoError(t, err)
+
+ refs := gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref", "--head")
+ require.Equal(t, string(expectedRefs), string(refs))
+}
+
+func TestServer_FetchBundle_transaction(t *testing.T) {
+ t.Parallel()
+ cfg, repoProto, repoPath := testcfg.BuildWithRepo(t)
+ testhelper.BuildGitalyHooks(t, cfg)
+
+ hookManager := &mockHookManager{}
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ ))
+ gitalypb.RegisterHookServiceServer(srv, hook.NewServer(
+ deps.GetCfg(),
+ deps.GetHookManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetPackObjectsCache(),
+ ))
+ }, testserver.WithHookManager(hookManager), testserver.WithDisablePraefect())
+
+ client := newRepositoryClient(t, cfg, addr)
+
+ tmp := testhelper.TempDir(t)
+ bundlePath := filepath.Join(tmp, "test.bundle")
+ gittest.BundleTestRepo(t, cfg, "gitlab-test.git", bundlePath)
+
+ _, stopGitServer := gittest.GitServer(t, cfg, repoPath, nil)
+ defer func() { require.NoError(t, stopGitServer()) }()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx = metadata.IncomingToOutgoing(ctx)
+
+ require.Empty(t, hookManager.states)
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ request := &gitalypb.FetchBundleRequest{Repository: repoProto}
+ writer := streamio.NewWriter(func(p []byte) error {
+ request.Data = p
+
+ if err := stream.Send(request); err != nil {
+ return err
+ }
+
+ request = &gitalypb.FetchBundleRequest{}
+
+ return nil
+ })
+
+ bundle, err := os.Open(bundlePath)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, bundle)
+
+ _, err = io.Copy(writer, bundle)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.NoError(t, err)
+
+ require.Equal(t, []gitalyhook.ReferenceTransactionState{
+ gitalyhook.ReferenceTransactionPrepared,
+ gitalyhook.ReferenceTransactionCommitted,
+ }, hookManager.states)
+}
+
+func TestServer_FetchBundle_validation(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ firstRequest *gitalypb.FetchBundleRequest
+ expectedStreamErr string
+ expectedStreamErrCode codes.Code
+ }{
+ {
+ desc: "no repo",
+ firstRequest: &gitalypb.FetchBundleRequest{
+ Repository: nil,
+ },
+ expectedStreamErr: "empty Repository",
+ expectedStreamErrCode: codes.InvalidArgument,
+ },
+ {
+ desc: "unknown repo",
+ firstRequest: &gitalypb.FetchBundleRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: "unknown",
+ },
+ },
+ expectedStreamErr: "not a git repository",
+ expectedStreamErrCode: codes.NotFound,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ _, client := setupRepositoryServiceWithoutRepo(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ stream, err := client.FetchBundle(ctx)
+ require.NoError(t, err)
+
+ err = stream.Send(tc.firstRequest)
+ require.NoError(t, err)
+
+ _, err = stream.CloseAndRecv()
+ require.Error(t, err)
+ if tc.expectedStreamErr != "" {
+ require.Contains(t, err.Error(), tc.expectedStreamErr)
+ }
+ if tc.expectedStreamErrCode != 0 {
+ require.Equal(t, tc.expectedStreamErrCode, helper.GrpcCode(err))
+ }
+ })
+ }
+}
+
+type mockHookManager struct {
+ gitalyhook.Manager
+ states []gitalyhook.ReferenceTransactionState
+}
+
+func (m *mockHookManager) ReferenceTransactionHook(_ context.Context, state gitalyhook.ReferenceTransactionState, _ []string, _ io.Reader) error {
+ m.states = append(m.states, state)
+ return nil
+}
diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go
index 34a9c90fa..0a18c9140 100644
--- a/internal/gitaly/service/repository/fetch_remote.go
+++ b/internal/gitaly/service/repository/fetch_remote.go
@@ -26,11 +26,12 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque
var stderr bytes.Buffer
opts := localrepo.FetchOpts{
- Stderr: &stderr,
- Force: req.Force,
- Prune: !req.NoPrune,
- Tags: localrepo.FetchOptsTagsAll,
- Verbose: req.GetCheckTagsChanged(),
+ Stderr: &stderr,
+ Force: req.Force,
+ Prune: !req.NoPrune,
+ Tags: localrepo.FetchOptsTagsAll,
+ Verbose: req.GetCheckTagsChanged(),
+ DisableTransactions: true,
}
if req.GetNoTags() {
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index aea341dd0..938f47fe7 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -73,6 +73,7 @@ var transactionRPCs = map[string]transactionsCondition{
"/gitaly.RepositoryService/CreateRepositoryFromBundle": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromSnapshot": transactionsEnabled,
"/gitaly.RepositoryService/CreateRepositoryFromURL": transactionsEnabled,
+ "/gitaly.RepositoryService/FetchBundle": transactionsEnabled,
"/gitaly.RepositoryService/FetchRemote": transactionsEnabled,
"/gitaly.RepositoryService/FetchSourceBranch": transactionsEnabled,
"/gitaly.RepositoryService/RemoveRepository": transactionsEnabled,
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index 87ebfcd9d..515c13e3d 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -112,6 +112,7 @@ func TestNewProtoRegistry(t *testing.T) {
"RepositorySize": protoregistry.OpAccessor,
"ApplyGitattributes": protoregistry.OpMutator,
"FetchRemote": protoregistry.OpMutator,
+ "FetchBundle": protoregistry.OpMutator,
"CreateRepository": protoregistry.OpMutator,
"GetArchive": protoregistry.OpAccessor,
"HasLocalBranches": protoregistry.OpAccessor,