diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-24 14:53:48 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-24 14:53:48 +0300 |
commit | f87bc1e983d11788fdbce953dced45ec5554af23 (patch) | |
tree | a3f2a32a09dbc536159f25ef2cce650419fbcfab | |
parent | 518670d57d1a6527aaf46b5b9bf5cb00f2e8f11b (diff) | |
parent | a2367d906ac47fa0a19d0c5de2a554bfaef8abc3 (diff) |
Merge branch 'pks-create-repository-atomic' into 'master'
Atomic repository creation
Closes #3779
See merge request gitlab-org/gitaly!3884
20 files changed, 1069 insertions, 232 deletions
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index c6195ad99..a857ebb2c 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -157,8 +157,6 @@ func TestAddRepository_Exec(t *testing.T) { relativePath := fmt.Sprintf("path/to/test/repo_%s", tn) repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - require.NoError(t, createRepoThroughGitaly1(relativePath)) - rmRepoCmd := &removeRepository{ logger: logger, virtualStorage: virtualStorageName, diff --git a/internal/git/localrepo/config_test.go b/internal/git/localrepo/config_test.go index a4b5ce26a..e0d188cf4 100644 --- a/internal/git/localrepo/config_test.go +++ b/internal/git/localrepo/config_test.go @@ -2,7 +2,6 @@ package localrepo import ( "context" - "errors" "fmt" "path/filepath" "runtime" @@ -86,7 +85,7 @@ func TestRepo_SetConfig(t *testing.T) { value: "value", locked: true, expectedEntries: standardEntries, - expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", errors.New("file already locked"))), + expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", safe.ErrFileAlreadyLocked)), }, } { t.Run(tc.desc, func(t *testing.T) { @@ -224,7 +223,7 @@ func TestRepo_UnsetMatchingConfig(t *testing.T) { desc: "locked", regex: ".*", locked: true, - expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", errors.New("file already locked"))), + expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", safe.ErrFileAlreadyLocked)), expectedKeys: standardKeys, }, } { diff --git a/internal/gitaly/service/repository/fork.go b/internal/gitaly/service/repository/create_fork.go index 610fdf55a..4f287eb23 100644 --- a/internal/gitaly/service/repository/fork.go +++ b/internal/gitaly/service/repository/create_fork.go @@ -7,6 +7,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/gitalyssh" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,6 +25,57 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest return nil, status.Errorf(codes.InvalidArgument, "CreateFork: empty Repository") } + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, targetRepository, func(repo *gitalypb.Repository) error { + targetPath, err := s.locator.GetPath(repo) + if err != nil { + return err + } + + env, err := gitalyssh.UploadPackEnv(ctx, s.cfg, &gitalypb.SSHUploadPackRequest{Repository: sourceRepository}) + if err != nil { + return err + } + + // git-clone(1) doesn't allow for the target path to exist, so we have to + // remove it first. + if err := os.RemoveAll(targetPath); err != nil { + return fmt.Errorf("removing target path: %w", err) + } + + // Ideally we'd just fetch into the already-created repo, but that wouldn't + // allow us to easily set up HEAD to point to the correct ref. We thus have + // no easy choice but to use git-clone(1). + cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{ + Name: "clone", + Flags: []git.Option{ + git.Flag{Name: "--bare"}, + }, + Args: []string{ + gitalyssh.GitalyInternalURL, + targetPath, + }, + }, git.WithEnv(env...), git.WithDisabledHooks()) + if err != nil { + return fmt.Errorf("spawning fetch: %w", err) + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("fetching source repo: %w", err) + } + + if err := s.removeOriginInRepo(ctx, repo); err != nil { + return fmt.Errorf("removing origin remote: %w", err) + } + + return nil + }); err != nil { + return nil, helper.ErrInternalf("creating fork: %w", err) + } + + return &gitalypb.CreateForkResponse{}, nil + } + targetRepositoryFullPath, err := s.locator.GetPath(targetRepository) if err != nil { return nil, err diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/create_fork_test.go index c5c7b3ca0..8083c2a8e 100644 --- a/internal/gitaly/service/repository/fork_test.go +++ b/internal/gitaly/service/repository/create_fork_test.go @@ -1,10 +1,12 @@ package repository import ( + "context" "crypto/tls" "crypto/x509" "os" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/require" @@ -27,23 +29,37 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" gitalyx509 "gitlab.com/gitlab-org/gitaly/v14/internal/x509" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/metadata" ) -func TestSuccessfulCreateForkRequest(t *testing.T) { +func TestCreateFork_successful(t *testing.T) { t.Parallel() + + // We need to inject this once across all tests given that crypto/x509 only initializes + // certificates once. Changing injected certs during our tests is thus not going to fly well + // and would cause failure. We should eventually address this and provide better testing + // utilities around this, but now's not the time. + certPool, tlsConfig := injectCustomCATestCerts(t) + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, func(t *testing.T, ctx context.Context) { + testCreateForkSuccessful(t, ctx, certPool, tlsConfig) + }) +} + +func testCreateForkSuccessful(t *testing.T, ctx context.Context, certPool *x509.CertPool, tlsConfig config.TLS) { for _, tt := range []struct { - name string - secure bool - beforeRequest func(repoPath string) + name string + secure bool }{ { name: "secure", @@ -52,12 +68,6 @@ func TestSuccessfulCreateForkRequest(t *testing.T) { { name: "insecure", }, - { - name: "existing empty directory target", - beforeRequest: func(repoPath string) { - require.NoError(t, os.MkdirAll(repoPath, 0o755)) - }, - }, } { t.Run(tt.name, func(t *testing.T) { cfg, repo, _ := testcfg.BuildWithRepo(t) @@ -71,45 +81,31 @@ func TestSuccessfulCreateForkRequest(t *testing.T) { ) if tt.secure { - testPool := injectCustomCATestCerts(t, &cfg) + cfg.TLS = tlsConfig cfg.TLSListenAddr = runSecureServer(t, cfg, nil) - client, conn = newSecureRepoClient(t, cfg.TLSListenAddr, cfg.Auth.Token, testPool) + client, conn = newSecureRepoClient(t, cfg.TLSListenAddr, cfg.Auth.Token, certPool) defer conn.Close() } else { client, cfg.SocketPath = runRepositoryService(t, cfg, nil) } - ctxOuter, cancel := testhelper.Context() - defer cancel() - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx := metadata.NewOutgoingContext(ctxOuter, md) + ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) forkedRepo := &gitalypb.Repository{ - RelativePath: "forks/test-repo-fork.git", + RelativePath: gittest.NewRepositoryName(t, true), StorageName: repo.GetStorageName(), } - forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath()) - require.NoError(t, os.RemoveAll(forkedRepoPath)) - - if tt.beforeRequest != nil { - tt.beforeRequest(forkedRepoPath) - } - - req := &gitalypb.CreateForkRequest{ + _, err := client.CreateFork(ctx, &gitalypb.CreateForkRequest{ Repository: forkedRepo, SourceRepository: repo, - } - - _, err := client.CreateFork(ctx, req) + }) require.NoError(t, err) - defer func() { require.NoError(t, os.RemoveAll(forkedRepoPath)) }() - gittest.Exec(t, cfg, "-C", forkedRepoPath, "fsck") + forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath()) - remotes := gittest.Exec(t, cfg, "-C", forkedRepoPath, "remote") - require.NotContains(t, string(remotes), "origin") + gittest.Exec(t, cfg, "-C", forkedRepoPath, "fsck") + require.Empty(t, gittest.Exec(t, cfg, "-C", forkedRepoPath, "remote")) _, err = os.Lstat(filepath.Join(forkedRepoPath, "hooks")) require.True(t, os.IsNotExist(err), "hooks directory should not have been created") @@ -117,88 +113,135 @@ func TestSuccessfulCreateForkRequest(t *testing.T) { } } -func newSecureRepoClient(t testing.TB, addr, token string, pool *x509.CertPool) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) { - t.Helper() +func TestCreateFork_refs(t *testing.T) { + t.Parallel() - connOpts := []grpc.DialOption{ - grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ - RootCAs: pool, - MinVersion: tls.VersionTLS12, - })), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)), + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateForkRefs) +} + +func testCreateForkRefs(t *testing.T, ctx context.Context) { + cfg := testcfg.Build(t) + testcfg.BuildGitalyHooks(t, cfg) + testcfg.BuildGitalySSH(t, cfg) + + sourceRepo, sourceRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0]) + + // Prepare the source repository with a bunch of refs and a non-default HEAD ref so we can + // assert that the target repo gets created with the correct set of refs. + commitID := gittest.WriteCommit(t, cfg, sourceRepoPath, gittest.WithParents()) + for _, ref := range []string{ + "refs/environments/something", + "refs/heads/something", + "refs/remotes/origin/something", + "refs/tags/something", + } { + gittest.Exec(t, cfg, "-C", sourceRepoPath, "update-ref", ref, commitID.String()) } + gittest.Exec(t, cfg, "-C", sourceRepoPath, "symbolic-ref", "HEAD", "refs/heads/something") - conn, err := client.Dial(addr, connOpts) + client, socketPath := runRepositoryService(t, cfg, nil) + cfg.SocketPath = socketPath + + ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) + + targetRepo := &gitalypb.Repository{ + RelativePath: gittest.NewRepositoryName(t, true), + StorageName: sourceRepo.GetStorageName(), + } + targetRepoPath, err := config.NewLocator(cfg).GetPath(targetRepo) require.NoError(t, err) - return gitalypb.NewRepositoryServiceClient(conn), conn + _, err = client.CreateFork(ctx, &gitalypb.CreateForkRequest{ + Repository: targetRepo, + SourceRepository: sourceRepo, + }) + require.NoError(t, err) + + require.Equal(t, + []string{ + commitID.String() + " refs/heads/something", + commitID.String() + " refs/tags/something", + }, + strings.Split(text.ChompBytes(gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref")), "\n"), + ) + + require.Equal(t, + string(gittest.Exec(t, cfg, "-C", sourceRepoPath, "symbolic-ref", "HEAD")), + string(gittest.Exec(t, cfg, "-C", targetRepoPath, "symbolic-ref", "HEAD")), + ) } -func TestFailedCreateForkRequestDueToExistingTarget(t *testing.T) { +func TestCreateFork_targetExists(t *testing.T) { t.Parallel() - cfg, repo, _, client := setupRepositoryService(t) - ctxOuter, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateForkTargetExists) +} + +func testCreateForkTargetExists(t *testing.T, ctx context.Context) { + cfg, repo, _, client := setupRepositoryService(t) - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx := metadata.NewOutgoingContext(ctxOuter, md) + ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) - testCases := []struct { - desc string - repoPath string - isDir bool + for _, tc := range []struct { + desc string + seed func(t *testing.T, targetPath string) + expectedErr error + expectedErrWithAtomicCreation error }{ { - desc: "target is a non-empty directory", - repoPath: "forks/test-repo-fork-dir.git", - isDir: true, + desc: "empty target directory", + seed: func(t *testing.T, targetPath string) { + require.NoError(t, os.MkdirAll(targetPath, 0o770)) + }, + expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"), }, { - desc: "target is a file", - repoPath: "forks/test-repo-fork-file.git", - isDir: false, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.desc, func(t *testing.T) { - forkedRepo := &gitalypb.Repository{ - RelativePath: testCase.repoPath, - StorageName: repo.StorageName, - } - - forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath()) - - if testCase.isDir { - require.NoError(t, os.MkdirAll(forkedRepoPath, 0o770)) + desc: "non-empty target directory", + seed: func(t *testing.T, targetPath string) { + require.NoError(t, os.MkdirAll(targetPath, 0o770)) require.NoError(t, os.WriteFile( - filepath.Join(forkedRepoPath, "config"), + filepath.Join(targetPath, "config"), nil, 0o644, )) - } else { - require.NoError(t, os.WriteFile(forkedRepoPath, nil, 0o644)) + }, + expectedErr: helper.ErrInvalidArgumentf("CreateFork: destination directory is not empty"), + expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"), + }, + { + desc: "target file", + seed: func(t *testing.T, targetPath string) { + require.NoError(t, os.MkdirAll(filepath.Dir(targetPath), 0o770)) + require.NoError(t, os.WriteFile(targetPath, nil, 0o644)) + }, + expectedErr: helper.ErrInvalidArgumentf("CreateFork: destination path exists"), + expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + forkedRepo := &gitalypb.Repository{ + RelativePath: gittest.NewRepositoryName(t, true), + StorageName: repo.StorageName, } - defer func() { require.NoError(t, os.RemoveAll(forkedRepoPath)) }() - req := &gitalypb.CreateForkRequest{ + tc.seed(t, filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath())) + + _, err := client.CreateFork(ctx, &gitalypb.CreateForkRequest{ Repository: forkedRepo, SourceRepository: repo, + }) + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testassert.GrpcEqualErr(t, tc.expectedErrWithAtomicCreation, err) + } else { + testassert.GrpcEqualErr(t, tc.expectedErr, err) } - - _, err := client.CreateFork(ctx, req) - testhelper.RequireGrpcError(t, err, codes.InvalidArgument) }) } } -func injectCustomCATestCerts(t *testing.T, cfg *config.Cfg) *x509.CertPool { +func injectCustomCATestCerts(t *testing.T) (*x509.CertPool, config.TLS) { certFile, keyFile := testhelper.GenerateCerts(t) - cfg.TLS.CertPath = certFile - cfg.TLS.KeyPath = keyFile - revertEnv := testhelper.ModifyEnvironment(t, gitalyx509.SSLCertFile, certFile) t.Cleanup(revertEnv) @@ -206,7 +249,7 @@ func injectCustomCATestCerts(t *testing.T, cfg *config.Cfg) *x509.CertPool { pool := x509.NewCertPool() require.True(t, pool.AppendCertsFromPEM(caPEMBytes)) - return pool + return pool, config.TLS{CertPath: certFile, KeyPath: keyFile} } func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) string { @@ -254,3 +297,20 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s return "tls://" + addr } + +func newSecureRepoClient(t testing.TB, addr, token string, pool *x509.CertPool) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) { + t.Helper() + + connOpts := []grpc.DialOption{ + grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + RootCAs: pool, + MinVersion: tls.VersionTLS12, + })), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)), + } + + conn, err := client.Dial(addr, connOpts) + require.NoError(t, err) + + return gitalypb.NewRepositoryServiceClient(conn), conn +} diff --git a/internal/gitaly/service/repository/create.go b/internal/gitaly/service/repository/create_repository.go index c78801e8d..334d8b79b 100644 --- a/internal/gitaly/service/repository/create.go +++ b/internal/gitaly/service/repository/create_repository.go @@ -9,6 +9,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -22,6 +23,18 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos return nil, helper.ErrInvalidArgumentf("locate repository: %w", err) } + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, req.GetRepository(), func(repo *gitalypb.Repository) error { + // We do not want to seed the repository with any contents, so we just + // return directly. + return nil + }); err != nil { + return nil, helper.ErrInternalf("creating repository: %w", err) + } + + return &gitalypb.CreateRepositoryResponse{}, nil + } + if err := os.MkdirAll(diskPath, 0o770); err != nil { return nil, helper.ErrInternalf("create directories: %w", err) } diff --git a/internal/gitaly/service/repository/create_from_bundle.go b/internal/gitaly/service/repository/create_repository_from_bundle.go index 092b77861..59523ed72 100644 --- a/internal/gitaly/service/repository/create_from_bundle.go +++ b/internal/gitaly/service/repository/create_repository_from_bundle.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" @@ -28,6 +29,62 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr return status.Errorf(codes.InvalidArgument, "CreateRepositoryFromBundle: empty Repository") } + ctx := stream.Context() + + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + firstRead := false + bundleReader := streamio.NewReader(func() ([]byte, error) { + if !firstRead { + firstRead = true + return firstRequest.GetData(), nil + } + + request, err := stream.Recv() + return request.GetData(), err + }) + + bundleDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator) + if err != nil { + return helper.ErrInternalf("creating bundle directory: %w", err) + } + + bundlePath := filepath.Join(bundleDir.Path(), "repo.bundle") + bundleFile, err := os.Create(bundlePath) + if err != nil { + return helper.ErrInternalf("creating bundle file: %w", err) + } + + if _, err := io.Copy(bundleFile, bundleReader); err != nil { + return helper.ErrInternalf("writing bundle file: %w", err) + } + + if err := s.createRepository(ctx, repo, func(repo *gitalypb.Repository) error { + var stderr bytes.Buffer + cmd, err := s.gitCmdFactory.New(ctx, repo, git.SubCmd{ + Name: "fetch", + Flags: []git.Option{ + git.Flag{Name: "--quiet"}, + git.Flag{Name: "--atomic"}, + }, + Args: []string{bundlePath, "refs/*:refs/*"}, + }, git.WithStderr(&stderr), git.WithRefTxHook(ctx, repo, s.cfg)) + if err != nil { + return helper.ErrInternalf("spawning fetch: %w", err) + } + + if err := cmd.Wait(); err != nil { + sanitizedStderr := sanitizedError("%s", stderr.String()) + return helper.ErrInternalf("fetch from bundle: %w, stderr: %q", err, sanitizedStderr) + } + + return nil + }); err != nil { + return helper.ErrInternalf("creating repository: %w", err) + } + + return stream.SendAndClose(&gitalypb.CreateRepositoryFromBundleResponse{}) + } + repoPath, err := s.locator.GetPath(repo) if err != nil { return helper.ErrInternal(err) @@ -48,8 +105,6 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr return request.GetData(), err }) - ctx := stream.Context() - tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator) if err != nil { cleanError := sanitizedError(tmpDir.Path(), "CreateRepositoryFromBundle: tmp dir failed: %v", err) diff --git a/internal/gitaly/service/repository/create_from_bundle_test.go b/internal/gitaly/service/repository/create_repository_from_bundle_test.go index 1c8f6d801..35dd45ddf 100644 --- a/internal/gitaly/service/repository/create_from_bundle_test.go +++ b/internal/gitaly/service/repository/create_repository_from_bundle_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" @@ -30,12 +31,14 @@ import ( "google.golang.org/grpc/status" ) -func TestServer_CreateRepositoryFromBundle_successful(t *testing.T) { +func TestCreateRepositoryFromBundle_successful(t *testing.T) { t.Parallel() - cfg, repo, repoPath, client := setupRepositoryService(t) - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleSuccessful) +} + +func testServerCreateRepositoryFromBundleSuccessful(t *testing.T, ctx context.Context) { + cfg, repo, repoPath, client := setupRepositoryService(t) locator := config.NewLocator(cfg) tmpdir, err := tempdir.New(ctx, repo.GetStorageName(), locator) @@ -92,7 +95,13 @@ func TestServer_CreateRepositoryFromBundle_successful(t *testing.T) { require.NotNil(t, commit) } -func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) { +func TestCreateRepositoryFromBundle_transactional(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromBundleTransactional) +} + +func testCreateRepositoryFromBundleTransactional(t *testing.T, ctx context.Context) { var votes []voting.Vote txManager := &transaction.MockManager{ VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error { @@ -114,8 +123,6 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) { gittest.Exec(t, cfg, "-C", repoPath, "update-ref", keepAroundRef, masterOID) } - ctx, cancel := testhelper.Context() - defer cancel() ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true) require.NoError(t, err) ctx = metadata.IncomingToOutgoing(ctx) @@ -145,6 +152,13 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) { _, err = stream.CloseAndRecv() require.NoError(t, err) + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + // We're being much more thorough with computation of the votes, so it's hard to say + // exactly what these votes look like. So we just assert we've got a bunch of them. + require.Len(t, votes, 4) + return + } + var votingInput []string // This accounts for the first two votes via git-clone(1). Given that git-clone(1) creates @@ -173,12 +187,14 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) { require.Equal(t, votes, expectedVotes) } -func TestServer_CreateRepositoryFromBundle_failed_invalid_bundle(t *testing.T) { +func TestCreateRepositoryFromBundle_invalidBundle(t *testing.T) { t.Parallel() - cfg, client := setupRepositoryServiceWithoutRepo(t) - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromBundleInvalidBundle) +} + +func testCreateRepositoryFromBundleInvalidBundle(t *testing.T, ctx context.Context) { + cfg, client := setupRepositoryServiceWithoutRepo(t) stream, err := client.CreateRepositoryFromBundle(ctx) require.NoError(t, err) @@ -211,12 +227,14 @@ func TestServer_CreateRepositoryFromBundle_failed_invalid_bundle(t *testing.T) { require.Contains(t, err.Error(), "invalid gitfile format") } -func TestServer_CreateRepositoryFromBundle_failed_validations(t *testing.T) { +func TestCreateRepositoryFromBundle_invalidArgument(t *testing.T) { t.Parallel() - _, client := setupRepositoryServiceWithoutRepo(t) - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleFailedValidations) +} + +func testServerCreateRepositoryFromBundleFailedValidations(t *testing.T, ctx context.Context) { + _, client := setupRepositoryServiceWithoutRepo(t) stream, err := client.CreateRepositoryFromBundle(ctx) require.NoError(t, err) @@ -227,12 +245,14 @@ func TestServer_CreateRepositoryFromBundle_failed_validations(t *testing.T) { testhelper.RequireGrpcError(t, err, codes.InvalidArgument) } -func TestServer_CreateRepositoryFromBundle_failed_existing_directory(t *testing.T) { +func TestCreateRepositoryFromBundle_existingRepository(t *testing.T) { t.Parallel() - _, repo, _, client := setupRepositoryService(t) - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleFailedExistingDirectory) +} + +func testServerCreateRepositoryFromBundleFailedExistingDirectory(t *testing.T, ctx context.Context) { + _, repo, _, client := setupRepositoryService(t) stream, err := client.CreateRepositoryFromBundle(ctx) require.NoError(t, err) @@ -242,7 +262,12 @@ func TestServer_CreateRepositoryFromBundle_failed_existing_directory(t *testing. })) _, err = stream.CloseAndRecv() - testassert.GrpcEqualErr(t, status.Error(codes.FailedPrecondition, "CreateRepositoryFromBundle: target directory is non-empty"), err) + + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testassert.GrpcEqualErr(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err) + } else { + testassert.GrpcEqualErr(t, status.Error(codes.FailedPrecondition, "CreateRepositoryFromBundle: target directory is non-empty"), err) + } } func TestSanitizedError(t *testing.T) { diff --git a/internal/gitaly/service/repository/create_from_snapshot.go b/internal/gitaly/service/repository/create_repository_from_snapshot.go index 1969a0b9d..b0dcd777f 100644 --- a/internal/gitaly/service/repository/create_from_snapshot.go +++ b/internal/gitaly/service/repository/create_repository_from_snapshot.go @@ -11,6 +11,8 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v14/internal/command" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" @@ -76,6 +78,32 @@ func untar(ctx context.Context, path string, in *gitalypb.CreateRepositoryFromSn } func (s *server) CreateRepositoryFromSnapshot(ctx context.Context, in *gitalypb.CreateRepositoryFromSnapshotRequest) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) { + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, in.GetRepository(), func(repo *gitalypb.Repository) error { + path, err := s.locator.GetPath(repo) + if err != nil { + return helper.ErrInternalf("getting repo path: %w", err) + } + + // The archive contains a partial git repository, missing a config file and + // other important items. Initializing a new bare one and extracting the + // archive on top of it ensures the created git repository has everything + // it needs (especially, the config file and hooks directory). + // + // NOTE: The received archive is trusted *a lot*. Before pointing this RPC + // at endpoints not under our control, it should undergo a lot of hardning. + if err := untar(ctx, path, in); err != nil { + return helper.ErrInternalf("extracting snapshot: %w", err) + } + + return nil + }); err != nil { + return nil, helper.ErrInternalf("creating repository: %w", err) + } + + return &gitalypb.CreateRepositoryFromSnapshotResponse{}, nil + } + realPath, err := s.locator.GetPath(in.Repository) if err != nil { return nil, err diff --git a/internal/gitaly/service/repository/create_from_snapshot_test.go b/internal/gitaly/service/repository/create_repository_from_snapshot_test.go index 5aa0c339b..596049312 100644 --- a/internal/gitaly/service/repository/create_from_snapshot_test.go +++ b/internal/gitaly/service/repository/create_repository_from_snapshot_test.go @@ -2,6 +2,7 @@ package repository import ( "bytes" + "context" "io" "net/http" "net/http/httptest" @@ -14,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/archive" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -60,20 +62,21 @@ func generateTarFile(t *testing.T, path string) ([]byte, []string) { return data, entries } -func createFromSnapshot(t *testing.T, req *gitalypb.CreateRepositoryFromSnapshotRequest, cfg config.Cfg) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) { +func createFromSnapshot(t *testing.T, ctx context.Context, req *gitalypb.CreateRepositoryFromSnapshotRequest, cfg config.Cfg) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) { t.Helper() serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil) client := newRepositoryClient(t, cfg, serverSocketPath) - ctx, cancel := testhelper.Context() - defer cancel() - return client.CreateRepositoryFromSnapshot(ctx, req) } -func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) { +func TestCreateRepositoryFromSnapshot_success(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotSuccess) +} + +func testCreateRepositoryFromSnapshotSuccess(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) _, sourceRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) @@ -98,7 +101,7 @@ func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) { HttpAuth: secret, } - rsp, err := createFromSnapshot(t, req, cfg) + rsp, err := createFromSnapshot(t, ctx, req, cfg) require.NoError(t, err) testassert.ProtoEqual(t, rsp, &gitalypb.CreateRepositoryFromSnapshotResponse{}) @@ -117,20 +120,35 @@ func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) { require.FileExists(t, filepath.Join(repoAbsolutePath, "config"), "Config file not created") } -func TestCreateRepositoryFromSnapshotFailsIfRepositoryExists(t *testing.T) { +func TestCreateRepositoryFromSnapshot_repositoryExists(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotFailsIfRepositoryExists) +} + +func testCreateRepositoryFromSnapshotFailsIfRepositoryExists(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) repo, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) req := &gitalypb.CreateRepositoryFromSnapshotRequest{Repository: repo} - rsp, err := createFromSnapshot(t, req, cfg) - testhelper.RequireGrpcError(t, err, codes.InvalidArgument) - require.Contains(t, err.Error(), "destination directory exists") + rsp, err := createFromSnapshot(t, ctx, req, cfg) + + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testhelper.RequireGrpcError(t, err, codes.AlreadyExists) + require.Contains(t, err.Error(), "creating repository: repository exists already") + } else { + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) + require.Contains(t, err.Error(), "destination directory exists") + } + require.Nil(t, rsp) } -func TestCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T) { +func TestCreateRepositoryFromSnapshot_badURL(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotFailsIfBadURL) +} + +func testCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) repo, repoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) require.NoError(t, os.RemoveAll(repoPath)) @@ -140,15 +158,19 @@ func TestCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T) { HttpUrl: "invalid!scheme://invalid.invalid", } - rsp, err := createFromSnapshot(t, req, cfg) + rsp, err := createFromSnapshot(t, ctx, req, cfg) testhelper.RequireGrpcError(t, err, codes.InvalidArgument) require.Contains(t, err.Error(), "Bad HTTP URL") require.Nil(t, rsp) } -func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) { +func TestCreateRepositoryFromSnapshot_invalidArguments(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotBadRequests) +} + +func testCreateRepositoryFromSnapshotBadRequests(t *testing.T, ctx context.Context) { testCases := []struct { desc string url string @@ -194,7 +216,7 @@ func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) { HttpAuth: tc.auth, } - rsp, err := createFromSnapshot(t, req, cfg) + rsp, err := createFromSnapshot(t, ctx, req, cfg) testhelper.RequireGrpcError(t, err, tc.code) require.Nil(t, rsp) @@ -203,8 +225,12 @@ func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) { } } -func TestCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T) { +func TestCreateRepositoryFromSnapshot_malformedResponse(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotHandlesMalformedResponse) +} + +func testCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) repo, repoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0]) @@ -227,7 +253,7 @@ func TestCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T) { HttpAuth: secret, } - rsp, err := createFromSnapshot(t, req, cfg) + rsp, err := createFromSnapshot(t, ctx, req, cfg) require.Error(t, err) require.Nil(t, rsp) diff --git a/internal/gitaly/service/repository/create_from_url.go b/internal/gitaly/service/repository/create_repository_from_url.go index e12c0c875..eacaeb3c9 100644 --- a/internal/gitaly/service/repository/create_from_url.go +++ b/internal/gitaly/service/repository/create_repository_from_url.go @@ -5,7 +5,6 @@ import ( "context" "encoding/base64" "fmt" - "io" "net/url" "os" @@ -13,12 +12,17 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func (s *server) cloneFromURLCommand(ctx context.Context, repo *gitalypb.Repository, repoURL, repositoryFullPath string, stderr io.Writer) (*command.Command, error) { +func (s *server) cloneFromURLCommand( + ctx context.Context, + repoURL, repositoryFullPath string, + opts ...git.CmdOpt, +) (*command.Command, error) { u, err := url.Parse(repoURL) if err != nil { return nil, helper.ErrInternal(err) @@ -54,9 +58,7 @@ func (s *server) cloneFromURLCommand(ctx context.Context, repo *gitalypb.Reposit Flags: cloneFlags, Args: []string{u.String(), repositoryFullPath}, }, - git.WithStderr(stderr), - git.WithRefTxHook(ctx, repo, s.cfg), - git.WithConfig(config...), + append(opts, git.WithConfig(config...))..., ) } @@ -65,6 +67,40 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea return nil, status.Errorf(codes.InvalidArgument, "CreateRepositoryFromURL: %v", err) } + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, req.GetRepository(), func(repo *gitalypb.Repository) error { + targetPath, err := s.locator.GetPath(repo) + if err != nil { + return fmt.Errorf("getting temporary repository path: %w", err) + } + + // We need to remove the target path first so git-clone(1) doesn't complain. + if err := os.RemoveAll(targetPath); err != nil { + return fmt.Errorf("removing temporary repository: %w", err) + } + + var stderr bytes.Buffer + cmd, err := s.cloneFromURLCommand(ctx, req.GetUrl(), targetPath, git.WithStderr(&stderr), git.WithDisabledHooks()) + if err != nil { + return fmt.Errorf("starting clone: %w", err) + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("cloning repository: %w, stderr: %q", err, stderr.String()) + } + + if err := s.removeOriginInRepo(ctx, repo); err != nil { + return fmt.Errorf("removing origin remote: %w", err) + } + + return nil + }); err != nil { + return nil, helper.ErrInternalf("creating repository: %w", err) + } + + return &gitalypb.CreateRepositoryFromURLResponse{}, nil + } + repository := req.Repository repositoryFullPath, err := s.locator.GetPath(repository) @@ -77,7 +113,7 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea } stderr := bytes.Buffer{} - cmd, err := s.cloneFromURLCommand(ctx, repository, req.GetUrl(), repositoryFullPath, &stderr) + cmd, err := s.cloneFromURLCommand(ctx, req.GetUrl(), repositoryFullPath, git.WithStderr(&stderr), git.WithRefTxHook(ctx, repository, s.cfg)) if err != nil { return nil, helper.ErrInternal(err) } diff --git a/internal/gitaly/service/repository/create_from_url_test.go b/internal/gitaly/service/repository/create_repository_from_url_test.go index 1edf97796..6d38ef4f1 100644 --- a/internal/gitaly/service/repository/create_from_url_test.go +++ b/internal/gitaly/service/repository/create_repository_from_url_test.go @@ -1,6 +1,7 @@ package repository import ( + "context" "encoding/base64" "fmt" "net/http" @@ -12,18 +13,20 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc/codes" ) -func TestSuccessfulCreateRepositoryFromURLRequest(t *testing.T) { +func TestCreateRepotitoryFromURL_successful(t *testing.T) { t.Parallel() - cfg, _, repoPath, client := setupRepositoryService(t) + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepotitoryFromURLSuccessful) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testCreateRepotitoryFromURLSuccessful(t *testing.T, ctx context.Context) { + cfg, _, repoPath, client := setupRepositoryService(t) importedRepo := &gitalypb.Repository{ RelativePath: "imports/test-repo-imported.git", @@ -57,37 +60,14 @@ func TestSuccessfulCreateRepositoryFromURLRequest(t *testing.T) { require.True(t, os.IsNotExist(err), "hooks directory should not have been created") } -func TestCloneRepositoryFromUrlCommand(t *testing.T) { +func TestCreateRepositoryFromURL_existingTarget(t *testing.T) { t.Parallel() - ctx, cancel := testhelper.Context() - defer cancel() - - userInfo := "user:pass%21%3F%40" - repositoryFullPath := "full/path/to/repository" - url := fmt.Sprintf("https://%s@www.example.com/secretrepo.git", userInfo) - - cfg := testcfg.Build(t) - s := server{cfg: cfg, gitCmdFactory: git.NewExecCommandFactory(cfg)} - cmd, err := s.cloneFromURLCommand(ctx, &gitalypb.Repository{}, url, repositoryFullPath, nil) - require.NoError(t, err) - - expectedScrubbedURL := "https://www.example.com/secretrepo.git" - expectedBasicAuthHeader := fmt.Sprintf("Authorization: Basic %s", base64.StdEncoding.EncodeToString([]byte("user:pass!?@"))) - expectedHeader := fmt.Sprintf("http.extraHeader=%s", expectedBasicAuthHeader) - - args := cmd.Args() - require.Contains(t, args, expectedScrubbedURL) - require.Contains(t, args, expectedHeader) - require.NotContains(t, args, userInfo) + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromURLExistingTarget) } -func TestFailedCreateRepositoryFromURLRequestDueToExistingTarget(t *testing.T) { - t.Parallel() +func testCreateRepositoryFromURLExistingTarget(t *testing.T, ctx context.Context) { cfg, client := setupRepositoryServiceWithoutRepo(t) - ctx, cancel := testhelper.Context() - defer cancel() - testCases := []struct { desc string repoPath string @@ -126,17 +106,21 @@ func TestFailedCreateRepositoryFromURLRequestDueToExistingTarget(t *testing.T) { } _, err := client.CreateRepositoryFromURL(ctx, req) - testhelper.RequireGrpcError(t, err, codes.InvalidArgument) + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testhelper.RequireGrpcError(t, err, codes.AlreadyExists) + } else { + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) + } }) } } -func TestPreventingRedirect(t *testing.T) { - t.Parallel() - cfg, client := setupRepositoryServiceWithoutRepo(t) +func TestCreateRepositoryFromURL_redirect(t *testing.T) { + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromURLRedirect) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testCreateRepositoryFromURLRedirect(t *testing.T, ctx context.Context) { + cfg, client := setupRepositoryServiceWithoutRepo(t) importedRepo := &gitalypb.Repository{ RelativePath: "imports/test-repo-imported.git", @@ -160,6 +144,30 @@ func TestPreventingRedirect(t *testing.T) { require.Contains(t, err.Error(), "The requested URL returned error: 301") } +func TestCloneRepositoryFromUrlCommand(t *testing.T) { + t.Parallel() + ctx, cancel := testhelper.Context() + defer cancel() + + userInfo := "user:pass%21%3F%40" + repositoryFullPath := "full/path/to/repository" + url := fmt.Sprintf("https://%s@www.example.com/secretrepo.git", userInfo) + + cfg := testcfg.Build(t) + s := server{cfg: cfg, gitCmdFactory: git.NewExecCommandFactory(cfg)} + cmd, err := s.cloneFromURLCommand(ctx, url, repositoryFullPath, git.WithDisabledHooks()) + require.NoError(t, err) + + expectedScrubbedURL := "https://www.example.com/secretrepo.git" + expectedBasicAuthHeader := fmt.Sprintf("Authorization: Basic %s", base64.StdEncoding.EncodeToString([]byte("user:pass!?@"))) + expectedHeader := fmt.Sprintf("http.extraHeader=%s", expectedBasicAuthHeader) + + args := cmd.Args() + require.Contains(t, args, expectedScrubbedURL) + require.Contains(t, args, expectedHeader) + require.NotContains(t, args, userInfo) +} + func gitServerWithBasicAuth(t testing.TB, cfg config.Cfg, user, pass, repoPath string) (int, func() error) { return gittest.GitServer(t, cfg, repoPath, basicAuthMiddleware(t, user, pass)) } diff --git a/internal/gitaly/service/repository/create_test.go b/internal/gitaly/service/repository/create_repository_test.go index 165d1257a..5039de10b 100644 --- a/internal/gitaly/service/repository/create_test.go +++ b/internal/gitaly/service/repository/create_repository_test.go @@ -17,7 +17,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" @@ -25,27 +27,34 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "golang.org/x/sys/unix" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -func TestRepoNoAuth(t *testing.T) { +func TestCreateRepository_missingAuth(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryMissingAuth) +} + +func testCreateRepositoryMissingAuth(t *testing.T, ctx context.Context) { cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "some"}})) serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil) client := newRepositoryClient(t, config.Cfg{Auth: auth.Config{Token: ""}}, serverSocketPath) - ctx, cancel := testhelper.Context() - defer cancel() - _, err := client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo}) testhelper.RequireGrpcError(t, err, codes.Unauthenticated) } -func TestCreateRepositorySuccess(t *testing.T) { - cfg, client := setupRepositoryServiceWithoutRepo(t) +func TestCreateRepository_successful(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositorySuccessful) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testCreateRepositorySuccessful(t *testing.T, ctx context.Context) { + cfg, client := setupRepositoryServiceWithoutRepo(t) relativePath := "create-repository-test.git" repoDir := filepath.Join(cfg.Storages[0].Path, relativePath) @@ -73,11 +82,14 @@ func TestCreateRepositorySuccess(t *testing.T) { require.Equal(t, symRef, []byte(fmt.Sprintf("ref: %s\n", git.DefaultRef))) } -func TestCreateRepositoryFailure(t *testing.T) { - cfg, client := setupRepositoryServiceWithoutRepo(t) +func TestCreateRepository_failure(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFailure) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testCreateRepositoryFailure(t *testing.T, ctx context.Context) { + cfg, client := setupRepositoryServiceWithoutRepo(t) storagePath := cfg.Storages[0].Path fullPath := filepath.Join(storagePath, "foo.git") @@ -89,14 +101,21 @@ func TestCreateRepositoryFailure(t *testing.T) { Repository: &gitalypb.Repository{StorageName: cfg.Storages[0].Name, RelativePath: "foo.git"}, }) - testhelper.RequireGrpcError(t, err, codes.Internal) + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testhelper.RequireGrpcError(t, err, codes.AlreadyExists) + } else { + testhelper.RequireGrpcError(t, err, codes.Internal) + } } -func TestCreateRepositoryFailureInvalidArgs(t *testing.T) { - _, client := setupRepositoryServiceWithoutRepo(t) +func TestCreateRepository_invalidArguments(t *testing.T) { + t.Parallel() - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryInvalidArguments) +} + +func testCreateRepositoryInvalidArguments(t *testing.T, ctx context.Context) { + _, client := setupRepositoryServiceWithoutRepo(t) testCases := []struct { repo *gitalypb.Repository @@ -118,7 +137,13 @@ func TestCreateRepositoryFailureInvalidArgs(t *testing.T) { } } -func TestCreateRepositoryTransactional(t *testing.T) { +func TestCreateRepository_transactional(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryTransactional) +} + +func testCreateRepositoryTransactional(t *testing.T, ctx context.Context) { var actualVote voting.Vote var called int @@ -132,9 +157,6 @@ func TestCreateRepositoryTransactional(t *testing.T) { cfg, client := setupRepositoryServiceWithoutRepo(t, testserver.WithTransactionManager(&mockTxManager)) - ctx, cancel := testhelper.Context() - defer cancel() - ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) require.NoError(t, err) ctx = metadata.IncomingToOutgoing(ctx) @@ -152,8 +174,12 @@ func TestCreateRepositoryTransactional(t *testing.T) { require.NoError(t, err) require.DirExists(t, filepath.Join(cfg.Storages[0].Path, "repo.git")) - require.Equal(t, 1, called, "expected transactional vote") - require.Equal(t, voting.VoteFromData([]byte{}), actualVote) + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + require.Equal(t, 2, called, "expected transactional vote") + } else { + require.Equal(t, 1, called, "expected transactional vote") + require.Equal(t, voting.VoteFromData([]byte{}), actualVote) + } }) t.Run("idempotent creation with preexisting refs", func(t *testing.T) { @@ -165,6 +191,12 @@ func TestCreateRepositoryTransactional(t *testing.T) { _, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ Repository: repo, }) + + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testassert.ProtoEqual(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err) + return + } + require.NoError(t, err) refs := gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref") @@ -175,16 +207,25 @@ func TestCreateRepositoryTransactional(t *testing.T) { }) } -func TestCreateRepositoryIdempotent(t *testing.T) { - cfg, repo, repoPath, client := setupRepositoryService(t) +func TestCreateRepository_idempotent(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryIdempotent) +} - ctx, cancel := testhelper.Context() - defer cancel() +func testCreateRepositoryIdempotent(t *testing.T, ctx context.Context) { + cfg, repo, repoPath, client := setupRepositoryService(t) refsBefore := strings.Split(string(gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")), "\n") req := &gitalypb.CreateRepositoryRequest{Repository: repo} _, err := client.CreateRepository(ctx, req) + + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + testassert.ProtoEqual(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err) + return + } + require.NoError(t, err) refsAfter := strings.Split(string(gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")), "\n") diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 7a54fe38a..eeb4667a9 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/safe" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -141,6 +142,20 @@ func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryReq } func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, in.GetRepository(), func(repo *gitalypb.Repository) error { + if err := s.extractSnapshot(ctx, in.GetSource(), repo); err != nil { + return fmt.Errorf("extracting snapshot: %w", err) + } + + return nil + }); err != nil { + return fmt.Errorf("creating repository: %w", err) + } + + return nil + } + tempRepo, tempDir, err := tempdir.NewRepository(ctx, in.GetRepository().GetStorageName(), s.locator) if err != nil { return fmt.Errorf("create temporary directory: %w", err) @@ -152,12 +167,33 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR return fmt.Errorf("create repository: %w", err) } - repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName()) + if err := s.extractSnapshot(ctx, in.GetSource(), tempRepo); err != nil { + return fmt.Errorf("extracting snapshot: %w", err) + } + + targetPath, err := s.locator.GetPath(in.GetRepository()) + if err != nil { + return fmt.Errorf("locate repository: %w", err) + } + + if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil { + return fmt.Errorf("create parent directories: %w", err) + } + + if err := os.Rename(tempDir.Path(), targetPath); err != nil { + return fmt.Errorf("move temporary directory to target path: %w", err) + } + + return nil +} + +func (s *server) extractSnapshot(ctx context.Context, source, target *gitalypb.Repository) error { + repoClient, err := s.newRepoClient(ctx, source.GetStorageName()) if err != nil { return fmt.Errorf("new client: %w", err) } - stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()}) + stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: source}) if err != nil { return fmt.Errorf("get snapshot: %w", err) } @@ -187,8 +223,13 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR }), ) + targetPath, err := s.locator.GetPath(target) + if err != nil { + return fmt.Errorf("target path: %w", err) + } + stderr := &bytes.Buffer{} - cmd, err := command.New(ctx, exec.Command("tar", "-C", tempDir.Path(), "-xvf", "-"), snapshotReader, nil, stderr) + cmd, err := command.New(ctx, exec.Command("tar", "-C", targetPath, "-xvf", "-"), snapshotReader, nil, stderr) if err != nil { return fmt.Errorf("create tar command: %w", err) } @@ -197,19 +238,6 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR return fmt.Errorf("wait for tar, stderr: %q, err: %w", stderr, err) } - targetPath, err := s.locator.GetPath(in.GetRepository()) - if err != nil { - return fmt.Errorf("locate repository: %w", err) - } - - if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil { - return fmt.Errorf("create parent directories: %w", err) - } - - if err := os.Rename(tempDir.Path(), targetPath); err != nil { - return fmt.Errorf("move temporary directory to target path: %w", err) - } - return nil } diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 43aaa23ca..090ec68bf 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -2,6 +2,7 @@ package repository import ( "bytes" + "context" "os" "path/filepath" "sync/atomic" @@ -14,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -27,6 +29,10 @@ import ( func TestReplicateRepository(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepository) +} + +func testReplicateRepository(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -61,8 +67,6 @@ func TestReplicateRepository(t *testing.T) { targetRepo := proto.Clone(repo).(*gitalypb.Repository) targetRepo.StorageName = cfg.Storages[1].Name - ctx, cancel := testhelper.Context() - defer cancel() ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -101,9 +105,10 @@ func TestReplicateRepository(t *testing.T) { func TestReplicateRepositoryTransactional(t *testing.T) { t.Parallel() - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryTransactional) +} +func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -149,7 +154,12 @@ func TestReplicateRepositoryTransactional(t *testing.T) { Source: sourceRepo, }) require.NoError(t, err) - require.EqualValues(t, 5, atomic.LoadInt32(&votes)) + + expectedVotes := 5 + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + expectedVotes++ + } + require.EqualValues(t, expectedVotes, atomic.LoadInt32(&votes)) // We're now changing a reference in the source repository such that we can observe changes // in the target repo. @@ -168,6 +178,10 @@ func TestReplicateRepositoryTransactional(t *testing.T) { func TestReplicateRepositoryInvalidArguments(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryInvalidArguments) +} + +func testReplicateRepositoryInvalidArguments(t *testing.T, ctx context.Context) { testCases := []struct { description string input *gitalypb.ReplicateRepositoryRequest @@ -241,9 +255,6 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { _, client := setupRepositoryServiceWithoutRepo(t) - ctx, cancel := testhelper.Context() - defer cancel() - for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { _, err := client.ReplicateRepository(ctx, tc.input) @@ -254,6 +265,10 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { func TestReplicateRepository_BadRepository(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryBadRepository) +} + +func testReplicateRepositoryBadRepository(t *testing.T, ctx context.Context) { for _, tc := range []struct { desc string invalidSource bool @@ -316,8 +331,6 @@ func TestReplicateRepository_BadRepository(t *testing.T) { } } - ctx, cancel := testhelper.Context() - defer cancel() ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) _, err := client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -338,6 +351,10 @@ func TestReplicateRepository_BadRepository(t *testing.T) { func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryFailedFetchInternalRemote) +} + +func testReplicateRepositoryFailedFetchInternalRemote(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t, testcfg.WithStorages("default", "replica")) testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) @@ -361,9 +378,6 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { gittest.Exec(t, cfg, "init", "--bare", sourceRepoPath) require.NoError(t, os.WriteFile(filepath.Join(sourceRepoPath, "HEAD"), []byte("garbage"), 0o666)) - ctx, cancel := testhelper.Context() - defer cancel() - ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) repoClient := newRepositoryClient(t, cfg, cfg.SocketPath) diff --git a/internal/gitaly/service/repository/util.go b/internal/gitaly/service/repository/util.go index 64c8e6656..0712b4e20 100644 --- a/internal/gitaly/service/repository/util.go +++ b/internal/gitaly/service/repository/util.go @@ -1,10 +1,21 @@ package repository import ( + "bytes" "context" "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/safe" + "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -19,3 +30,162 @@ func (s *server) removeOriginInRepo(ctx context.Context, repository *gitalypb.Re return nil } + +// createRepository will create a new repository in a race-free way with proper transactional +// semantics. The repository will only be created if it doesn't yet exist and if nodes which take +// part in the transaction reach quorum. Otherwise, the target path of the new repository will not +// be modified. The repository can optionally be seeded with contents +func (s *server) createRepository( + ctx context.Context, + repository *gitalypb.Repository, + seedRepository func(repository *gitalypb.Repository) error, +) error { + targetPath, err := s.locator.GetPath(repository) + if err != nil { + return helper.ErrInvalidArgumentf("locate repository: %w", err) + } + + // The repository must not exist on disk already, or otherwise we won't be able to + // create it with atomic semantics. + if _, err := os.Stat(targetPath); !os.IsNotExist(err) { + return helper.ErrAlreadyExistsf("repository exists already") + } + + // Create the parent directory in case it doesn't exist yet. + if err := os.MkdirAll(filepath.Dir(targetPath), 0o770); err != nil { + return helper.ErrInternalf("create directories: %w", err) + } + + newRepo, newRepoDir, err := tempdir.NewRepository(ctx, repository.GetStorageName(), s.locator) + if err != nil { + return fmt.Errorf("creating temporary repository: %w", err) + } + defer func() { + // We don't really care about whether this succeeds or not. It will either get + // cleaned up after the context is done, or eventually by the tempdir walker when + // it's old enough. + _ = os.RemoveAll(newRepoDir.Path()) + }() + + // Note that we do not create the repository directly in its target location, but + // instead create it in a temporary directory, first. This is done such that we can + // guarantee atomicity and roll back the change easily in case an error happens. + stderr := &bytes.Buffer{} + cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{ + Name: "init", + Flags: []git.Option{ + git.Flag{Name: "--bare"}, + git.Flag{Name: "--quiet"}, + }, + Args: []string{newRepoDir.Path()}, + }, git.WithStderr(stderr)) + if err != nil { + return fmt.Errorf("spawning git-init: %w", err) + } + if err := cmd.Wait(); err != nil { + return fmt.Errorf("creating repository: %w, stderr: %q", err, stderr.String()) + } + + if err := seedRepository(newRepo); err != nil { + // Return the error returned by the callback function as-is so we don't clobber any + // potential returned gRPC error codes. + return err + } + + // In order to guarantee that the repository is going to be the same across all + // Gitalies in case we're behind Praefect, we walk the repository and hash all of + // its files. + voteHash := voting.NewVoteHash() + if err := filepath.WalkDir(newRepoDir.Path(), func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return err + } + + // The way packfiles are generated may not be deterministic, so we skip over the + // object database. + if path == filepath.Join(newRepoDir.Path(), "objects") { + return fs.SkipDir + } + + // We do not care about directories. + if entry.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("opening %q: %w", entry.Name(), err) + } + defer file.Close() + + if _, err := io.Copy(voteHash, file); err != nil { + return fmt.Errorf("hashing %q: %w", entry.Name(), err) + } + + return nil + }); err != nil { + return fmt.Errorf("walking repository: %w", err) + } + + vote, err := voteHash.Vote() + if err != nil { + return fmt.Errorf("computing vote: %w", err) + } + + // We're somewhat abusing this file writer given that we simply want to assert that + // the target directory doesn't exist and isn't created while we want to move the + // new repository into place. We thus only use the locking semantics of the writer, + // but will never commit it. + locker, err := safe.NewLockingFileWriter(targetPath) + if err != nil { + return fmt.Errorf("creating locker: %w", err) + } + defer func() { + if err := locker.Close(); err != nil { + ctxlogrus.Extract(ctx).Error("closing repository locker: %w", err) + } + }() + + // We're now entering the critical section where we want to have exclusive access + // over creation of the repository. So we: + // + // 1. Lock the repository path such that no other process can create it at the same + // time. + // 2. Vote on the new repository's state. + // 3. Move the repository into place. + // 4. Do another confirmatory vote to signal that we performed the change. + // 5. Unlock the repository again. + // + // This sequence guarantees that the change is atomic and can trivially be rolled + // back in case we fail to either lock the repository or reach quorum in the initial + // vote. + if err := locker.Lock(); err != nil { + return fmt.Errorf("locking repository: %w", err) + } + + // Now that the repository is locked, we must assert that it _still_ doesn't exist. + // Otherwise, it could have happened that a concurrent RPC call created it while we created + // and seeded our temporary repository. While we would notice this at the point of moving + // the repository into place, we want to be as sure as possible that the action will succeed + // previous to the first transactional vote. + if _, err := os.Stat(targetPath); !os.IsNotExist(err) { + return helper.ErrAlreadyExistsf("repository exists already") + } + + if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil { + return helper.ErrFailedPreconditionf("preparatory vote: %w", err) + } + + // Now that we have locked the repository and all Gitalies have agreed that they + // want to do the same change we can move the repository into place. + if err := os.Rename(newRepoDir.Path(), targetPath); err != nil { + return fmt.Errorf("moving repository into place: %w", err) + } + + if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil { + return helper.ErrFailedPreconditionf("committing vote: %w", err) + } + + // We unlock the repository implicitly via the deferred `Close()` call. + return nil +} diff --git a/internal/gitaly/service/repository/util_test.go b/internal/gitaly/service/repository/util_test.go new file mode 100644 index 000000000..2e4602a63 --- /dev/null +++ b/internal/gitaly/service/repository/util_test.go @@ -0,0 +1,264 @@ +package repository + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/safe" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc/peer" +) + +func TestCreateRepository(t *testing.T) { + t.Parallel() + + ctx, cancel := testhelper.Context() + defer cancel() + + cfg := testcfg.Build(t) + + txManager := &transaction.MockManager{} + locator := config.NewLocator(cfg) + gitCmdFactory := git.NewExecCommandFactory(cfg) + + server := &server{ + cfg: cfg, + locator: locator, + txManager: txManager, + gitCmdFactory: gitCmdFactory, + } + + votes := 0 + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repo *gitalypb.Repository, repoPath string) + seed func(t *testing.T, repo *gitalypb.Repository, repoPath string) error + verify func( + t *testing.T, + tempRepo *gitalypb.Repository, + tempRepoPath string, + realRepo *gitalypb.Repository, + realRepoPath string, + ) + transactional bool + expectedErr error + }{ + { + desc: "no seeding", + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + // Assert that the temporary repository does not exist anymore. + require.NoDirExists(t, tempRepoPath) + + // But the new repository must exist. + isBareRepo := gittest.Exec(t, cfg, "-C", realRepoPath, "rev-parse", "--is-bare-repository") + require.Equal(t, "true", text.ChompBytes(isBareRepo)) + }, + }, + { + desc: "seeding", + seed: func(t *testing.T, repo *gitalypb.Repository, _ string) error { + // We're using the command factory on purpose here to assert that we + // can execute regular Git commands on the temporary repository. + cmd, err := gitCmdFactory.New(ctx, repo, git.SubCmd{ + Name: "config", + Args: []string{"custom.key", "value"}, + }) + require.NoError(t, err) + require.NoError(t, cmd.Wait()) + return nil + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + value := gittest.Exec(t, cfg, "-C", realRepoPath, "config", "custom.key") + require.Equal(t, "value", text.ChompBytes(value)) + }, + }, + { + desc: "error while seeding", + seed: func(t *testing.T, repo *gitalypb.Repository, _ string) error { + return errors.New("some error") + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, realRepoPath) + require.NoDirExists(t, tempRepoPath) + }, + expectedErr: errors.New("some error"), + }, + { + desc: "preexisting directory", + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + require.NoError(t, os.MkdirAll(repoPath, 0o777)) + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, tempRepoPath) + + require.DirExists(t, realRepoPath) + dirEntries, err := os.ReadDir(realRepoPath) + require.NoError(t, err) + require.Empty(t, dirEntries, "directory should not have been modified") + }, + expectedErr: helper.ErrAlreadyExistsf("repository exists already"), + }, + { + desc: "locked", + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + require.NoError(t, os.MkdirAll(filepath.Dir(repoPath), 0o777)) + + // Lock the target repository such that we must fail. + lock, err := os.Create(repoPath + ".lock") + require.NoError(t, err) + require.NoError(t, lock.Close()) + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, tempRepoPath) + require.NoDirExists(t, realRepoPath) + require.FileExists(t, realRepoPath+".lock") + }, + expectedErr: fmt.Errorf("locking repository: %w", safe.ErrFileAlreadyLocked), + }, + { + desc: "successful transaction", + transactional: true, + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + votes = 0 + txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error { + votes++ + return nil + } + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.Equal(t, 2, votes) + }, + }, + { + desc: "failing preparatory vote", + transactional: true, + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error { + return errors.New("vote failed") + } + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, tempRepoPath) + require.NoDirExists(t, realRepoPath) + }, + expectedErr: helper.ErrFailedPreconditionf("preparatory vote: %w", errors.New("vote failed")), + }, + { + desc: "failing post-commit vote", + transactional: true, + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + votes = 0 + txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error { + votes++ + if votes == 1 { + return nil + } + return errors.New("vote failed") + } + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, tempRepoPath) + + // The second vote is only a confirming vote that the node did the + // change. So if the second vote fails, then the change must have + // been performed and thus we'd see the repository. + require.DirExists(t, realRepoPath) + }, + expectedErr: helper.ErrFailedPreconditionf("committing vote: %w", errors.New("vote failed")), + }, + { + desc: "voting happens after lock", + transactional: true, + setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) { + // We both set up transactions and create the lock. Given that we + // should try locking the repository before casting any votes, we do + // not expect to see a voting error. + + require.NoError(t, os.MkdirAll(filepath.Dir(repoPath), 0o777)) + lock, err := os.Create(repoPath + ".lock") + require.NoError(t, err) + require.NoError(t, lock.Close()) + + txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error { + require.FailNow(t, "no votes should have happened") + return nil + } + }, + verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) { + require.NoDirExists(t, tempRepoPath) + require.NoDirExists(t, realRepoPath) + }, + expectedErr: fmt.Errorf("locking repository: %w", errors.New("file already locked")), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + repo := &gitalypb.Repository{ + StorageName: cfg.Storages[0].Name, + RelativePath: gittest.NewRepositoryName(t, true), + } + + if tc.transactional { + var err error + ctx, err = txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) + ctx = peer.NewContext(ctx, &peer.Peer{}) + } + + repoPath, err := locator.GetPath(repo) + require.NoError(t, err) + + if tc.setup != nil { + tc.setup(t, repo, repoPath) + } + + var tempRepo *gitalypb.Repository + require.Equal(t, tc.expectedErr, server.createRepository(ctx, repo, func(tr *gitalypb.Repository) error { + tempRepo = tr + + // The temporary repository must have been created in Gitaly's + // temporary storage path. + require.Equal(t, repo.StorageName, tempRepo.StorageName) + require.True(t, strings.HasPrefix(tempRepo.RelativePath, "+gitaly/tmp/repo")) + + // Verify that the temporary repository exists and is a real Git + // repository. + tempRepoPath, err := locator.GetRepoPath(tempRepo) + require.NoError(t, err) + isBareRepo := gittest.Exec(t, cfg, "-C", tempRepoPath, "rev-parse", "--is-bare-repository") + require.Equal(t, "true", text.ChompBytes(isBareRepo)) + + if tc.seed != nil { + return tc.seed(t, tempRepo, tempRepoPath) + } + + return nil + })) + + var tempRepoPath string + if tempRepo != nil { + tempRepoPath, err = locator.GetPath(tempRepo) + require.NoError(t, err) + } + + require.NotNil(t, tc.verify, "test must verify results") + tc.verify(t, tempRepo, tempRepoPath, repo, repoPath) + }) + } +} diff --git a/internal/helper/error.go b/internal/helper/error.go index 6075866ae..81f25ddcc 100644 --- a/internal/helper/error.go +++ b/internal/helper/error.go @@ -93,6 +93,12 @@ func ErrPermissionDeniedf(format string, a ...interface{}) error { return formatError(codes.PermissionDenied, format, a...) } +// ErrAlreadyExistsf wraps a formatted error with codes.AlreadyExists, unless the formatted error is +// a wrapped gRPC error. +func ErrAlreadyExistsf(format string, a ...interface{}) error { + return formatError(codes.AlreadyExists, format, a...) +} + // formatError will create a new error from the given format string. If the error string contains a // %w verb and its corresponding error has a gRPC error code, then the returned error will keep this // gRPC error code instead of using the one provided as an argument. diff --git a/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go b/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go new file mode 100644 index 000000000..456f7da0e --- /dev/null +++ b/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go @@ -0,0 +1,8 @@ +package featureflag + +// TxAtomicRepositoryCreation will switch CreateRepository, CreateRepositoryFromBundle, +// CreateRepositoryFromSnapshot and CreateRepositoryFromURL to have proper transactional guarantees. +// This changes behaviour such the target repository must not exist previous to the call, creation +// and seeding of the repository is done in a temporary staging area and then moved into place only +// if no other RPC call created it concurrently. +var TxAtomicRepositoryCreation = NewFeatureFlag("tx_atomic_repository_creation", false) diff --git a/internal/safe/locking_file_writer.go b/internal/safe/locking_file_writer.go index b7e8405c2..c43720cbe 100644 --- a/internal/safe/locking_file_writer.go +++ b/internal/safe/locking_file_writer.go @@ -1,6 +1,7 @@ package safe import ( + "errors" "fmt" "io" "os" @@ -14,6 +15,10 @@ const ( lockingFileWriterStateClosed ) +// ErrFileAlreadyLocked is returned when trying to lock a file which has already been locked by +// another concurrent process. +var ErrFileAlreadyLocked = errors.New("file already locked") + // LockingFileWriter is a FileWriter which locks the target file on commit and checks whether it // has been modified since the LockingFileWriter has been created. The user must first create a new // LockingFileWriter via `NewLockingFileWriter()`, at which point it is open for writes. The writer @@ -134,7 +139,7 @@ func (fw *LockingFileWriter) Lock() error { lock, err := os.OpenFile(fw.lockPath(), os.O_CREATE|os.O_EXCL|os.O_RDONLY, 0o400) if err != nil { if os.IsExist(err) { - return fmt.Errorf("file already locked") + return ErrFileAlreadyLocked } return fmt.Errorf("creating lock file: %w", err) diff --git a/internal/safe/locking_file_writer_test.go b/internal/safe/locking_file_writer_test.go index 152c63d94..7632a9d69 100644 --- a/internal/safe/locking_file_writer_test.go +++ b/internal/safe/locking_file_writer_test.go @@ -261,7 +261,7 @@ func TestLockingFileWriter_concurrentLocking(t *testing.T) { require.NoError(t, err) require.NoError(t, first.Lock()) - require.Equal(t, fmt.Errorf("file already locked"), second.Lock()) + require.Equal(t, safe.ErrFileAlreadyLocked, second.Lock()) require.NoError(t, first.Commit()) require.Equal(t, []byte("first"), testhelper.MustReadFile(t, file)) @@ -279,7 +279,7 @@ func TestLockingFileWriter_locked(t *testing.T) { // Concurrently lock the file. require.NoError(t, os.WriteFile(target+".lock", nil, 0o644)) - require.Equal(t, fmt.Errorf("file already locked"), writer.Lock()) + require.Equal(t, safe.ErrFileAlreadyLocked, writer.Lock()) require.Equal(t, []byte("base"), testhelper.MustReadFile(t, target)) } |