Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-24 14:53:48 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-11-24 14:53:48 +0300
commitf87bc1e983d11788fdbce953dced45ec5554af23 (patch)
treea3f2a32a09dbc536159f25ef2cce650419fbcfab
parent518670d57d1a6527aaf46b5b9bf5cb00f2e8f11b (diff)
parenta2367d906ac47fa0a19d0c5de2a554bfaef8abc3 (diff)
Merge branch 'pks-create-repository-atomic' into 'master'
Atomic repository creation Closes #3779 See merge request gitlab-org/gitaly!3884
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go2
-rw-r--r--internal/git/localrepo/config_test.go5
-rw-r--r--internal/gitaly/service/repository/create_fork.go (renamed from internal/gitaly/service/repository/fork.go)53
-rw-r--r--internal/gitaly/service/repository/create_fork_test.go (renamed from internal/gitaly/service/repository/fork_test.go)236
-rw-r--r--internal/gitaly/service/repository/create_repository.go (renamed from internal/gitaly/service/repository/create.go)13
-rw-r--r--internal/gitaly/service/repository/create_repository_from_bundle.go (renamed from internal/gitaly/service/repository/create_from_bundle.go)59
-rw-r--r--internal/gitaly/service/repository/create_repository_from_bundle_test.go (renamed from internal/gitaly/service/repository/create_from_bundle_test.go)65
-rw-r--r--internal/gitaly/service/repository/create_repository_from_snapshot.go (renamed from internal/gitaly/service/repository/create_from_snapshot.go)28
-rw-r--r--internal/gitaly/service/repository/create_repository_from_snapshot_test.go (renamed from internal/gitaly/service/repository/create_from_snapshot_test.go)58
-rw-r--r--internal/gitaly/service/repository/create_repository_from_url.go (renamed from internal/gitaly/service/repository/create_from_url.go)48
-rw-r--r--internal/gitaly/service/repository/create_repository_from_url_test.go (renamed from internal/gitaly/service/repository/create_from_url_test.go)80
-rw-r--r--internal/gitaly/service/repository/create_repository_test.go (renamed from internal/gitaly/service/repository/create_test.go)95
-rw-r--r--internal/gitaly/service/repository/replicate.go60
-rw-r--r--internal/gitaly/service/repository/replicate_test.go40
-rw-r--r--internal/gitaly/service/repository/util.go170
-rw-r--r--internal/gitaly/service/repository/util_test.go264
-rw-r--r--internal/helper/error.go6
-rw-r--r--internal/metadata/featureflag/ff_tx_atomic_repository_creation.go8
-rw-r--r--internal/safe/locking_file_writer.go7
-rw-r--r--internal/safe/locking_file_writer_test.go4
20 files changed, 1069 insertions, 232 deletions
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index c6195ad99..a857ebb2c 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -157,8 +157,6 @@ func TestAddRepository_Exec(t *testing.T) {
relativePath := fmt.Sprintf("path/to/test/repo_%s", tn)
repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, createRepoThroughGitaly1(relativePath))
-
rmRepoCmd := &removeRepository{
logger: logger,
virtualStorage: virtualStorageName,
diff --git a/internal/git/localrepo/config_test.go b/internal/git/localrepo/config_test.go
index a4b5ce26a..e0d188cf4 100644
--- a/internal/git/localrepo/config_test.go
+++ b/internal/git/localrepo/config_test.go
@@ -2,7 +2,6 @@ package localrepo
import (
"context"
- "errors"
"fmt"
"path/filepath"
"runtime"
@@ -86,7 +85,7 @@ func TestRepo_SetConfig(t *testing.T) {
value: "value",
locked: true,
expectedEntries: standardEntries,
- expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", errors.New("file already locked"))),
+ expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", safe.ErrFileAlreadyLocked)),
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -224,7 +223,7 @@ func TestRepo_UnsetMatchingConfig(t *testing.T) {
desc: "locked",
regex: ".*",
locked: true,
- expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", errors.New("file already locked"))),
+ expectedErr: fmt.Errorf("committing config: %w", fmt.Errorf("locking file: %w", safe.ErrFileAlreadyLocked)),
expectedKeys: standardKeys,
},
} {
diff --git a/internal/gitaly/service/repository/fork.go b/internal/gitaly/service/repository/create_fork.go
index 610fdf55a..4f287eb23 100644
--- a/internal/gitaly/service/repository/fork.go
+++ b/internal/gitaly/service/repository/create_fork.go
@@ -7,6 +7,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitalyssh"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -23,6 +25,57 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest
return nil, status.Errorf(codes.InvalidArgument, "CreateFork: empty Repository")
}
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ if err := s.createRepository(ctx, targetRepository, func(repo *gitalypb.Repository) error {
+ targetPath, err := s.locator.GetPath(repo)
+ if err != nil {
+ return err
+ }
+
+ env, err := gitalyssh.UploadPackEnv(ctx, s.cfg, &gitalypb.SSHUploadPackRequest{Repository: sourceRepository})
+ if err != nil {
+ return err
+ }
+
+ // git-clone(1) doesn't allow for the target path to exist, so we have to
+ // remove it first.
+ if err := os.RemoveAll(targetPath); err != nil {
+ return fmt.Errorf("removing target path: %w", err)
+ }
+
+ // Ideally we'd just fetch into the already-created repo, but that wouldn't
+ // allow us to easily set up HEAD to point to the correct ref. We thus have
+ // no easy choice but to use git-clone(1).
+ cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{
+ Name: "clone",
+ Flags: []git.Option{
+ git.Flag{Name: "--bare"},
+ },
+ Args: []string{
+ gitalyssh.GitalyInternalURL,
+ targetPath,
+ },
+ }, git.WithEnv(env...), git.WithDisabledHooks())
+ if err != nil {
+ return fmt.Errorf("spawning fetch: %w", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ return fmt.Errorf("fetching source repo: %w", err)
+ }
+
+ if err := s.removeOriginInRepo(ctx, repo); err != nil {
+ return fmt.Errorf("removing origin remote: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return nil, helper.ErrInternalf("creating fork: %w", err)
+ }
+
+ return &gitalypb.CreateForkResponse{}, nil
+ }
+
targetRepositoryFullPath, err := s.locator.GetPath(targetRepository)
if err != nil {
return nil, err
diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/create_fork_test.go
index c5c7b3ca0..8083c2a8e 100644
--- a/internal/gitaly/service/repository/fork_test.go
+++ b/internal/gitaly/service/repository/create_fork_test.go
@@ -1,10 +1,12 @@
package repository
import (
+ "context"
"crypto/tls"
"crypto/x509"
"os"
"path/filepath"
+ "strings"
"testing"
"github.com/stretchr/testify/require"
@@ -27,23 +29,37 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitlab"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
gitalyx509 "gitlab.com/gitlab-org/gitaly/v14/internal/x509"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
- "google.golang.org/grpc/metadata"
)
-func TestSuccessfulCreateForkRequest(t *testing.T) {
+func TestCreateFork_successful(t *testing.T) {
t.Parallel()
+
+ // We need to inject this once across all tests given that crypto/x509 only initializes
+ // certificates once. Changing injected certs during our tests is thus not going to fly well
+ // and would cause failure. We should eventually address this and provide better testing
+ // utilities around this, but now's not the time.
+ certPool, tlsConfig := injectCustomCATestCerts(t)
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, func(t *testing.T, ctx context.Context) {
+ testCreateForkSuccessful(t, ctx, certPool, tlsConfig)
+ })
+}
+
+func testCreateForkSuccessful(t *testing.T, ctx context.Context, certPool *x509.CertPool, tlsConfig config.TLS) {
for _, tt := range []struct {
- name string
- secure bool
- beforeRequest func(repoPath string)
+ name string
+ secure bool
}{
{
name: "secure",
@@ -52,12 +68,6 @@ func TestSuccessfulCreateForkRequest(t *testing.T) {
{
name: "insecure",
},
- {
- name: "existing empty directory target",
- beforeRequest: func(repoPath string) {
- require.NoError(t, os.MkdirAll(repoPath, 0o755))
- },
- },
} {
t.Run(tt.name, func(t *testing.T) {
cfg, repo, _ := testcfg.BuildWithRepo(t)
@@ -71,45 +81,31 @@ func TestSuccessfulCreateForkRequest(t *testing.T) {
)
if tt.secure {
- testPool := injectCustomCATestCerts(t, &cfg)
+ cfg.TLS = tlsConfig
cfg.TLSListenAddr = runSecureServer(t, cfg, nil)
- client, conn = newSecureRepoClient(t, cfg.TLSListenAddr, cfg.Auth.Token, testPool)
+ client, conn = newSecureRepoClient(t, cfg.TLSListenAddr, cfg.Auth.Token, certPool)
defer conn.Close()
} else {
client, cfg.SocketPath = runRepositoryService(t, cfg, nil)
}
- ctxOuter, cancel := testhelper.Context()
- defer cancel()
-
- md := testcfg.GitalyServersMetadataFromCfg(t, cfg)
- ctx := metadata.NewOutgoingContext(ctxOuter, md)
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
forkedRepo := &gitalypb.Repository{
- RelativePath: "forks/test-repo-fork.git",
+ RelativePath: gittest.NewRepositoryName(t, true),
StorageName: repo.GetStorageName(),
}
- forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath())
- require.NoError(t, os.RemoveAll(forkedRepoPath))
-
- if tt.beforeRequest != nil {
- tt.beforeRequest(forkedRepoPath)
- }
-
- req := &gitalypb.CreateForkRequest{
+ _, err := client.CreateFork(ctx, &gitalypb.CreateForkRequest{
Repository: forkedRepo,
SourceRepository: repo,
- }
-
- _, err := client.CreateFork(ctx, req)
+ })
require.NoError(t, err)
- defer func() { require.NoError(t, os.RemoveAll(forkedRepoPath)) }()
- gittest.Exec(t, cfg, "-C", forkedRepoPath, "fsck")
+ forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath())
- remotes := gittest.Exec(t, cfg, "-C", forkedRepoPath, "remote")
- require.NotContains(t, string(remotes), "origin")
+ gittest.Exec(t, cfg, "-C", forkedRepoPath, "fsck")
+ require.Empty(t, gittest.Exec(t, cfg, "-C", forkedRepoPath, "remote"))
_, err = os.Lstat(filepath.Join(forkedRepoPath, "hooks"))
require.True(t, os.IsNotExist(err), "hooks directory should not have been created")
@@ -117,88 +113,135 @@ func TestSuccessfulCreateForkRequest(t *testing.T) {
}
}
-func newSecureRepoClient(t testing.TB, addr, token string, pool *x509.CertPool) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) {
- t.Helper()
+func TestCreateFork_refs(t *testing.T) {
+ t.Parallel()
- connOpts := []grpc.DialOption{
- grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
- RootCAs: pool,
- MinVersion: tls.VersionTLS12,
- })),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)),
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateForkRefs)
+}
+
+func testCreateForkRefs(t *testing.T, ctx context.Context) {
+ cfg := testcfg.Build(t)
+ testcfg.BuildGitalyHooks(t, cfg)
+ testcfg.BuildGitalySSH(t, cfg)
+
+ sourceRepo, sourceRepoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
+
+ // Prepare the source repository with a bunch of refs and a non-default HEAD ref so we can
+ // assert that the target repo gets created with the correct set of refs.
+ commitID := gittest.WriteCommit(t, cfg, sourceRepoPath, gittest.WithParents())
+ for _, ref := range []string{
+ "refs/environments/something",
+ "refs/heads/something",
+ "refs/remotes/origin/something",
+ "refs/tags/something",
+ } {
+ gittest.Exec(t, cfg, "-C", sourceRepoPath, "update-ref", ref, commitID.String())
}
+ gittest.Exec(t, cfg, "-C", sourceRepoPath, "symbolic-ref", "HEAD", "refs/heads/something")
- conn, err := client.Dial(addr, connOpts)
+ client, socketPath := runRepositoryService(t, cfg, nil)
+ cfg.SocketPath = socketPath
+
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
+
+ targetRepo := &gitalypb.Repository{
+ RelativePath: gittest.NewRepositoryName(t, true),
+ StorageName: sourceRepo.GetStorageName(),
+ }
+ targetRepoPath, err := config.NewLocator(cfg).GetPath(targetRepo)
require.NoError(t, err)
- return gitalypb.NewRepositoryServiceClient(conn), conn
+ _, err = client.CreateFork(ctx, &gitalypb.CreateForkRequest{
+ Repository: targetRepo,
+ SourceRepository: sourceRepo,
+ })
+ require.NoError(t, err)
+
+ require.Equal(t,
+ []string{
+ commitID.String() + " refs/heads/something",
+ commitID.String() + " refs/tags/something",
+ },
+ strings.Split(text.ChompBytes(gittest.Exec(t, cfg, "-C", targetRepoPath, "show-ref")), "\n"),
+ )
+
+ require.Equal(t,
+ string(gittest.Exec(t, cfg, "-C", sourceRepoPath, "symbolic-ref", "HEAD")),
+ string(gittest.Exec(t, cfg, "-C", targetRepoPath, "symbolic-ref", "HEAD")),
+ )
}
-func TestFailedCreateForkRequestDueToExistingTarget(t *testing.T) {
+func TestCreateFork_targetExists(t *testing.T) {
t.Parallel()
- cfg, repo, _, client := setupRepositoryService(t)
- ctxOuter, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateForkTargetExists)
+}
+
+func testCreateForkTargetExists(t *testing.T, ctx context.Context) {
+ cfg, repo, _, client := setupRepositoryService(t)
- md := testcfg.GitalyServersMetadataFromCfg(t, cfg)
- ctx := metadata.NewOutgoingContext(ctxOuter, md)
+ ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
- testCases := []struct {
- desc string
- repoPath string
- isDir bool
+ for _, tc := range []struct {
+ desc string
+ seed func(t *testing.T, targetPath string)
+ expectedErr error
+ expectedErrWithAtomicCreation error
}{
{
- desc: "target is a non-empty directory",
- repoPath: "forks/test-repo-fork-dir.git",
- isDir: true,
+ desc: "empty target directory",
+ seed: func(t *testing.T, targetPath string) {
+ require.NoError(t, os.MkdirAll(targetPath, 0o770))
+ },
+ expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"),
},
{
- desc: "target is a file",
- repoPath: "forks/test-repo-fork-file.git",
- isDir: false,
- },
- }
-
- for _, testCase := range testCases {
- t.Run(testCase.desc, func(t *testing.T) {
- forkedRepo := &gitalypb.Repository{
- RelativePath: testCase.repoPath,
- StorageName: repo.StorageName,
- }
-
- forkedRepoPath := filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath())
-
- if testCase.isDir {
- require.NoError(t, os.MkdirAll(forkedRepoPath, 0o770))
+ desc: "non-empty target directory",
+ seed: func(t *testing.T, targetPath string) {
+ require.NoError(t, os.MkdirAll(targetPath, 0o770))
require.NoError(t, os.WriteFile(
- filepath.Join(forkedRepoPath, "config"),
+ filepath.Join(targetPath, "config"),
nil,
0o644,
))
- } else {
- require.NoError(t, os.WriteFile(forkedRepoPath, nil, 0o644))
+ },
+ expectedErr: helper.ErrInvalidArgumentf("CreateFork: destination directory is not empty"),
+ expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"),
+ },
+ {
+ desc: "target file",
+ seed: func(t *testing.T, targetPath string) {
+ require.NoError(t, os.MkdirAll(filepath.Dir(targetPath), 0o770))
+ require.NoError(t, os.WriteFile(targetPath, nil, 0o644))
+ },
+ expectedErr: helper.ErrInvalidArgumentf("CreateFork: destination path exists"),
+ expectedErrWithAtomicCreation: helper.ErrAlreadyExistsf("creating fork: repository exists already"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ forkedRepo := &gitalypb.Repository{
+ RelativePath: gittest.NewRepositoryName(t, true),
+ StorageName: repo.StorageName,
}
- defer func() { require.NoError(t, os.RemoveAll(forkedRepoPath)) }()
- req := &gitalypb.CreateForkRequest{
+ tc.seed(t, filepath.Join(cfg.Storages[0].Path, forkedRepo.GetRelativePath()))
+
+ _, err := client.CreateFork(ctx, &gitalypb.CreateForkRequest{
Repository: forkedRepo,
SourceRepository: repo,
+ })
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testassert.GrpcEqualErr(t, tc.expectedErrWithAtomicCreation, err)
+ } else {
+ testassert.GrpcEqualErr(t, tc.expectedErr, err)
}
-
- _, err := client.CreateFork(ctx, req)
- testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
})
}
}
-func injectCustomCATestCerts(t *testing.T, cfg *config.Cfg) *x509.CertPool {
+func injectCustomCATestCerts(t *testing.T) (*x509.CertPool, config.TLS) {
certFile, keyFile := testhelper.GenerateCerts(t)
- cfg.TLS.CertPath = certFile
- cfg.TLS.KeyPath = keyFile
-
revertEnv := testhelper.ModifyEnvironment(t, gitalyx509.SSLCertFile, certFile)
t.Cleanup(revertEnv)
@@ -206,7 +249,7 @@ func injectCustomCATestCerts(t *testing.T, cfg *config.Cfg) *x509.CertPool {
pool := x509.NewCertPool()
require.True(t, pool.AppendCertsFromPEM(caPEMBytes))
- return pool
+ return pool, config.TLS{CertPath: certFile, KeyPath: keyFile}
}
func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) string {
@@ -254,3 +297,20 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s
return "tls://" + addr
}
+
+func newSecureRepoClient(t testing.TB, addr, token string, pool *x509.CertPool) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) {
+ t.Helper()
+
+ connOpts := []grpc.DialOption{
+ grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
+ RootCAs: pool,
+ MinVersion: tls.VersionTLS12,
+ })),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token)),
+ }
+
+ conn, err := client.Dial(addr, connOpts)
+ require.NoError(t, err)
+
+ return gitalypb.NewRepositoryServiceClient(conn), conn
+}
diff --git a/internal/gitaly/service/repository/create.go b/internal/gitaly/service/repository/create_repository.go
index c78801e8d..334d8b79b 100644
--- a/internal/gitaly/service/repository/create.go
+++ b/internal/gitaly/service/repository/create_repository.go
@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -22,6 +23,18 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos
return nil, helper.ErrInvalidArgumentf("locate repository: %w", err)
}
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ if err := s.createRepository(ctx, req.GetRepository(), func(repo *gitalypb.Repository) error {
+ // We do not want to seed the repository with any contents, so we just
+ // return directly.
+ return nil
+ }); err != nil {
+ return nil, helper.ErrInternalf("creating repository: %w", err)
+ }
+
+ return &gitalypb.CreateRepositoryResponse{}, nil
+ }
+
if err := os.MkdirAll(diskPath, 0o770); err != nil {
return nil, helper.ErrInternalf("create directories: %w", err)
}
diff --git a/internal/gitaly/service/repository/create_from_bundle.go b/internal/gitaly/service/repository/create_repository_from_bundle.go
index 092b77861..59523ed72 100644
--- a/internal/gitaly/service/repository/create_from_bundle.go
+++ b/internal/gitaly/service/repository/create_repository_from_bundle.go
@@ -10,6 +10,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
@@ -28,6 +29,62 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr
return status.Errorf(codes.InvalidArgument, "CreateRepositoryFromBundle: empty Repository")
}
+ ctx := stream.Context()
+
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ firstRead := false
+ bundleReader := streamio.NewReader(func() ([]byte, error) {
+ if !firstRead {
+ firstRead = true
+ return firstRequest.GetData(), nil
+ }
+
+ request, err := stream.Recv()
+ return request.GetData(), err
+ })
+
+ bundleDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator)
+ if err != nil {
+ return helper.ErrInternalf("creating bundle directory: %w", err)
+ }
+
+ bundlePath := filepath.Join(bundleDir.Path(), "repo.bundle")
+ bundleFile, err := os.Create(bundlePath)
+ if err != nil {
+ return helper.ErrInternalf("creating bundle file: %w", err)
+ }
+
+ if _, err := io.Copy(bundleFile, bundleReader); err != nil {
+ return helper.ErrInternalf("writing bundle file: %w", err)
+ }
+
+ if err := s.createRepository(ctx, repo, func(repo *gitalypb.Repository) error {
+ var stderr bytes.Buffer
+ cmd, err := s.gitCmdFactory.New(ctx, repo, git.SubCmd{
+ Name: "fetch",
+ Flags: []git.Option{
+ git.Flag{Name: "--quiet"},
+ git.Flag{Name: "--atomic"},
+ },
+ Args: []string{bundlePath, "refs/*:refs/*"},
+ }, git.WithStderr(&stderr), git.WithRefTxHook(ctx, repo, s.cfg))
+ if err != nil {
+ return helper.ErrInternalf("spawning fetch: %w", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ sanitizedStderr := sanitizedError("%s", stderr.String())
+ return helper.ErrInternalf("fetch from bundle: %w, stderr: %q", err, sanitizedStderr)
+ }
+
+ return nil
+ }); err != nil {
+ return helper.ErrInternalf("creating repository: %w", err)
+ }
+
+ return stream.SendAndClose(&gitalypb.CreateRepositoryFromBundleResponse{})
+ }
+
repoPath, err := s.locator.GetPath(repo)
if err != nil {
return helper.ErrInternal(err)
@@ -48,8 +105,6 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr
return request.GetData(), err
})
- ctx := stream.Context()
-
tmpDir, err := tempdir.New(ctx, repo.GetStorageName(), s.locator)
if err != nil {
cleanError := sanitizedError(tmpDir.Path(), "CreateRepositoryFromBundle: tmp dir failed: %v", err)
diff --git a/internal/gitaly/service/repository/create_from_bundle_test.go b/internal/gitaly/service/repository/create_repository_from_bundle_test.go
index 1c8f6d801..35dd45ddf 100644
--- a/internal/gitaly/service/repository/create_from_bundle_test.go
+++ b/internal/gitaly/service/repository/create_repository_from_bundle_test.go
@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
@@ -30,12 +31,14 @@ import (
"google.golang.org/grpc/status"
)
-func TestServer_CreateRepositoryFromBundle_successful(t *testing.T) {
+func TestCreateRepositoryFromBundle_successful(t *testing.T) {
t.Parallel()
- cfg, repo, repoPath, client := setupRepositoryService(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleSuccessful)
+}
+
+func testServerCreateRepositoryFromBundleSuccessful(t *testing.T, ctx context.Context) {
+ cfg, repo, repoPath, client := setupRepositoryService(t)
locator := config.NewLocator(cfg)
tmpdir, err := tempdir.New(ctx, repo.GetStorageName(), locator)
@@ -92,7 +95,13 @@ func TestServer_CreateRepositoryFromBundle_successful(t *testing.T) {
require.NotNil(t, commit)
}
-func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) {
+func TestCreateRepositoryFromBundle_transactional(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromBundleTransactional)
+}
+
+func testCreateRepositoryFromBundleTransactional(t *testing.T, ctx context.Context) {
var votes []voting.Vote
txManager := &transaction.MockManager{
VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
@@ -114,8 +123,6 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) {
gittest.Exec(t, cfg, "-C", repoPath, "update-ref", keepAroundRef, masterOID)
}
- ctx, cancel := testhelper.Context()
- defer cancel()
ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true)
require.NoError(t, err)
ctx = metadata.IncomingToOutgoing(ctx)
@@ -145,6 +152,13 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) {
_, err = stream.CloseAndRecv()
require.NoError(t, err)
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ // We're being much more thorough with computation of the votes, so it's hard to say
+ // exactly what these votes look like. So we just assert we've got a bunch of them.
+ require.Len(t, votes, 4)
+ return
+ }
+
var votingInput []string
// This accounts for the first two votes via git-clone(1). Given that git-clone(1) creates
@@ -173,12 +187,14 @@ func TestServerCreateRepositoryFromBundleTransactional(t *testing.T) {
require.Equal(t, votes, expectedVotes)
}
-func TestServer_CreateRepositoryFromBundle_failed_invalid_bundle(t *testing.T) {
+func TestCreateRepositoryFromBundle_invalidBundle(t *testing.T) {
t.Parallel()
- cfg, client := setupRepositoryServiceWithoutRepo(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromBundleInvalidBundle)
+}
+
+func testCreateRepositoryFromBundleInvalidBundle(t *testing.T, ctx context.Context) {
+ cfg, client := setupRepositoryServiceWithoutRepo(t)
stream, err := client.CreateRepositoryFromBundle(ctx)
require.NoError(t, err)
@@ -211,12 +227,14 @@ func TestServer_CreateRepositoryFromBundle_failed_invalid_bundle(t *testing.T) {
require.Contains(t, err.Error(), "invalid gitfile format")
}
-func TestServer_CreateRepositoryFromBundle_failed_validations(t *testing.T) {
+func TestCreateRepositoryFromBundle_invalidArgument(t *testing.T) {
t.Parallel()
- _, client := setupRepositoryServiceWithoutRepo(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleFailedValidations)
+}
+
+func testServerCreateRepositoryFromBundleFailedValidations(t *testing.T, ctx context.Context) {
+ _, client := setupRepositoryServiceWithoutRepo(t)
stream, err := client.CreateRepositoryFromBundle(ctx)
require.NoError(t, err)
@@ -227,12 +245,14 @@ func TestServer_CreateRepositoryFromBundle_failed_validations(t *testing.T) {
testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
}
-func TestServer_CreateRepositoryFromBundle_failed_existing_directory(t *testing.T) {
+func TestCreateRepositoryFromBundle_existingRepository(t *testing.T) {
t.Parallel()
- _, repo, _, client := setupRepositoryService(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testServerCreateRepositoryFromBundleFailedExistingDirectory)
+}
+
+func testServerCreateRepositoryFromBundleFailedExistingDirectory(t *testing.T, ctx context.Context) {
+ _, repo, _, client := setupRepositoryService(t)
stream, err := client.CreateRepositoryFromBundle(ctx)
require.NoError(t, err)
@@ -242,7 +262,12 @@ func TestServer_CreateRepositoryFromBundle_failed_existing_directory(t *testing.
}))
_, err = stream.CloseAndRecv()
- testassert.GrpcEqualErr(t, status.Error(codes.FailedPrecondition, "CreateRepositoryFromBundle: target directory is non-empty"), err)
+
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testassert.GrpcEqualErr(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err)
+ } else {
+ testassert.GrpcEqualErr(t, status.Error(codes.FailedPrecondition, "CreateRepositoryFromBundle: target directory is non-empty"), err)
+ }
}
func TestSanitizedError(t *testing.T) {
diff --git a/internal/gitaly/service/repository/create_from_snapshot.go b/internal/gitaly/service/repository/create_repository_from_snapshot.go
index 1969a0b9d..b0dcd777f 100644
--- a/internal/gitaly/service/repository/create_from_snapshot.go
+++ b/internal/gitaly/service/repository/create_repository_from_snapshot.go
@@ -11,6 +11,8 @@ import (
"time"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
@@ -76,6 +78,32 @@ func untar(ctx context.Context, path string, in *gitalypb.CreateRepositoryFromSn
}
func (s *server) CreateRepositoryFromSnapshot(ctx context.Context, in *gitalypb.CreateRepositoryFromSnapshotRequest) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ if err := s.createRepository(ctx, in.GetRepository(), func(repo *gitalypb.Repository) error {
+ path, err := s.locator.GetPath(repo)
+ if err != nil {
+ return helper.ErrInternalf("getting repo path: %w", err)
+ }
+
+ // The archive contains a partial git repository, missing a config file and
+ // other important items. Initializing a new bare one and extracting the
+ // archive on top of it ensures the created git repository has everything
+ // it needs (especially, the config file and hooks directory).
+ //
+ // NOTE: The received archive is trusted *a lot*. Before pointing this RPC
+ // at endpoints not under our control, it should undergo a lot of hardning.
+ if err := untar(ctx, path, in); err != nil {
+ return helper.ErrInternalf("extracting snapshot: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return nil, helper.ErrInternalf("creating repository: %w", err)
+ }
+
+ return &gitalypb.CreateRepositoryFromSnapshotResponse{}, nil
+ }
+
realPath, err := s.locator.GetPath(in.Repository)
if err != nil {
return nil, err
diff --git a/internal/gitaly/service/repository/create_from_snapshot_test.go b/internal/gitaly/service/repository/create_repository_from_snapshot_test.go
index 5aa0c339b..596049312 100644
--- a/internal/gitaly/service/repository/create_from_snapshot_test.go
+++ b/internal/gitaly/service/repository/create_repository_from_snapshot_test.go
@@ -2,6 +2,7 @@ package repository
import (
"bytes"
+ "context"
"io"
"net/http"
"net/http/httptest"
@@ -14,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/archive"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
@@ -60,20 +62,21 @@ func generateTarFile(t *testing.T, path string) ([]byte, []string) {
return data, entries
}
-func createFromSnapshot(t *testing.T, req *gitalypb.CreateRepositoryFromSnapshotRequest, cfg config.Cfg) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
+func createFromSnapshot(t *testing.T, ctx context.Context, req *gitalypb.CreateRepositoryFromSnapshotRequest, cfg config.Cfg) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
t.Helper()
serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil)
client := newRepositoryClient(t, cfg, serverSocketPath)
- ctx, cancel := testhelper.Context()
- defer cancel()
-
return client.CreateRepositoryFromSnapshot(ctx, req)
}
-func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) {
+func TestCreateRepositoryFromSnapshot_success(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotSuccess)
+}
+
+func testCreateRepositoryFromSnapshotSuccess(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
_, sourceRepoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
@@ -98,7 +101,7 @@ func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) {
HttpAuth: secret,
}
- rsp, err := createFromSnapshot(t, req, cfg)
+ rsp, err := createFromSnapshot(t, ctx, req, cfg)
require.NoError(t, err)
testassert.ProtoEqual(t, rsp, &gitalypb.CreateRepositoryFromSnapshotResponse{})
@@ -117,20 +120,35 @@ func TestCreateRepositoryFromSnapshotSuccess(t *testing.T) {
require.FileExists(t, filepath.Join(repoAbsolutePath, "config"), "Config file not created")
}
-func TestCreateRepositoryFromSnapshotFailsIfRepositoryExists(t *testing.T) {
+func TestCreateRepositoryFromSnapshot_repositoryExists(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotFailsIfRepositoryExists)
+}
+
+func testCreateRepositoryFromSnapshotFailsIfRepositoryExists(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
repo, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0])
req := &gitalypb.CreateRepositoryFromSnapshotRequest{Repository: repo}
- rsp, err := createFromSnapshot(t, req, cfg)
- testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
- require.Contains(t, err.Error(), "destination directory exists")
+ rsp, err := createFromSnapshot(t, ctx, req, cfg)
+
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testhelper.RequireGrpcError(t, err, codes.AlreadyExists)
+ require.Contains(t, err.Error(), "creating repository: repository exists already")
+ } else {
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+ require.Contains(t, err.Error(), "destination directory exists")
+ }
+
require.Nil(t, rsp)
}
-func TestCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T) {
+func TestCreateRepositoryFromSnapshot_badURL(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotFailsIfBadURL)
+}
+
+func testCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
repo, repoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
require.NoError(t, os.RemoveAll(repoPath))
@@ -140,15 +158,19 @@ func TestCreateRepositoryFromSnapshotFailsIfBadURL(t *testing.T) {
HttpUrl: "invalid!scheme://invalid.invalid",
}
- rsp, err := createFromSnapshot(t, req, cfg)
+ rsp, err := createFromSnapshot(t, ctx, req, cfg)
testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
require.Contains(t, err.Error(), "Bad HTTP URL")
require.Nil(t, rsp)
}
-func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) {
+func TestCreateRepositoryFromSnapshot_invalidArguments(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotBadRequests)
+}
+
+func testCreateRepositoryFromSnapshotBadRequests(t *testing.T, ctx context.Context) {
testCases := []struct {
desc string
url string
@@ -194,7 +216,7 @@ func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) {
HttpAuth: tc.auth,
}
- rsp, err := createFromSnapshot(t, req, cfg)
+ rsp, err := createFromSnapshot(t, ctx, req, cfg)
testhelper.RequireGrpcError(t, err, tc.code)
require.Nil(t, rsp)
@@ -203,8 +225,12 @@ func TestCreateRepositoryFromSnapshotBadRequests(t *testing.T) {
}
}
-func TestCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T) {
+func TestCreateRepositoryFromSnapshot_malformedResponse(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromSnapshotHandlesMalformedResponse)
+}
+
+func testCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
repo, repoPath := gittest.CloneRepo(t, cfg, cfg.Storages[0])
@@ -227,7 +253,7 @@ func TestCreateRepositoryFromSnapshotHandlesMalformedResponse(t *testing.T) {
HttpAuth: secret,
}
- rsp, err := createFromSnapshot(t, req, cfg)
+ rsp, err := createFromSnapshot(t, ctx, req, cfg)
require.Error(t, err)
require.Nil(t, rsp)
diff --git a/internal/gitaly/service/repository/create_from_url.go b/internal/gitaly/service/repository/create_repository_from_url.go
index e12c0c875..eacaeb3c9 100644
--- a/internal/gitaly/service/repository/create_from_url.go
+++ b/internal/gitaly/service/repository/create_repository_from_url.go
@@ -5,7 +5,6 @@ import (
"context"
"encoding/base64"
"fmt"
- "io"
"net/url"
"os"
@@ -13,12 +12,17 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-func (s *server) cloneFromURLCommand(ctx context.Context, repo *gitalypb.Repository, repoURL, repositoryFullPath string, stderr io.Writer) (*command.Command, error) {
+func (s *server) cloneFromURLCommand(
+ ctx context.Context,
+ repoURL, repositoryFullPath string,
+ opts ...git.CmdOpt,
+) (*command.Command, error) {
u, err := url.Parse(repoURL)
if err != nil {
return nil, helper.ErrInternal(err)
@@ -54,9 +58,7 @@ func (s *server) cloneFromURLCommand(ctx context.Context, repo *gitalypb.Reposit
Flags: cloneFlags,
Args: []string{u.String(), repositoryFullPath},
},
- git.WithStderr(stderr),
- git.WithRefTxHook(ctx, repo, s.cfg),
- git.WithConfig(config...),
+ append(opts, git.WithConfig(config...))...,
)
}
@@ -65,6 +67,40 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea
return nil, status.Errorf(codes.InvalidArgument, "CreateRepositoryFromURL: %v", err)
}
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ if err := s.createRepository(ctx, req.GetRepository(), func(repo *gitalypb.Repository) error {
+ targetPath, err := s.locator.GetPath(repo)
+ if err != nil {
+ return fmt.Errorf("getting temporary repository path: %w", err)
+ }
+
+ // We need to remove the target path first so git-clone(1) doesn't complain.
+ if err := os.RemoveAll(targetPath); err != nil {
+ return fmt.Errorf("removing temporary repository: %w", err)
+ }
+
+ var stderr bytes.Buffer
+ cmd, err := s.cloneFromURLCommand(ctx, req.GetUrl(), targetPath, git.WithStderr(&stderr), git.WithDisabledHooks())
+ if err != nil {
+ return fmt.Errorf("starting clone: %w", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ return fmt.Errorf("cloning repository: %w, stderr: %q", err, stderr.String())
+ }
+
+ if err := s.removeOriginInRepo(ctx, repo); err != nil {
+ return fmt.Errorf("removing origin remote: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return nil, helper.ErrInternalf("creating repository: %w", err)
+ }
+
+ return &gitalypb.CreateRepositoryFromURLResponse{}, nil
+ }
+
repository := req.Repository
repositoryFullPath, err := s.locator.GetPath(repository)
@@ -77,7 +113,7 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea
}
stderr := bytes.Buffer{}
- cmd, err := s.cloneFromURLCommand(ctx, repository, req.GetUrl(), repositoryFullPath, &stderr)
+ cmd, err := s.cloneFromURLCommand(ctx, req.GetUrl(), repositoryFullPath, git.WithStderr(&stderr), git.WithRefTxHook(ctx, repository, s.cfg))
if err != nil {
return nil, helper.ErrInternal(err)
}
diff --git a/internal/gitaly/service/repository/create_from_url_test.go b/internal/gitaly/service/repository/create_repository_from_url_test.go
index 1edf97796..6d38ef4f1 100644
--- a/internal/gitaly/service/repository/create_from_url_test.go
+++ b/internal/gitaly/service/repository/create_repository_from_url_test.go
@@ -1,6 +1,7 @@
package repository
import (
+ "context"
"encoding/base64"
"fmt"
"net/http"
@@ -12,18 +13,20 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc/codes"
)
-func TestSuccessfulCreateRepositoryFromURLRequest(t *testing.T) {
+func TestCreateRepotitoryFromURL_successful(t *testing.T) {
t.Parallel()
- cfg, _, repoPath, client := setupRepositoryService(t)
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepotitoryFromURLSuccessful)
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testCreateRepotitoryFromURLSuccessful(t *testing.T, ctx context.Context) {
+ cfg, _, repoPath, client := setupRepositoryService(t)
importedRepo := &gitalypb.Repository{
RelativePath: "imports/test-repo-imported.git",
@@ -57,37 +60,14 @@ func TestSuccessfulCreateRepositoryFromURLRequest(t *testing.T) {
require.True(t, os.IsNotExist(err), "hooks directory should not have been created")
}
-func TestCloneRepositoryFromUrlCommand(t *testing.T) {
+func TestCreateRepositoryFromURL_existingTarget(t *testing.T) {
t.Parallel()
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- userInfo := "user:pass%21%3F%40"
- repositoryFullPath := "full/path/to/repository"
- url := fmt.Sprintf("https://%s@www.example.com/secretrepo.git", userInfo)
-
- cfg := testcfg.Build(t)
- s := server{cfg: cfg, gitCmdFactory: git.NewExecCommandFactory(cfg)}
- cmd, err := s.cloneFromURLCommand(ctx, &gitalypb.Repository{}, url, repositoryFullPath, nil)
- require.NoError(t, err)
-
- expectedScrubbedURL := "https://www.example.com/secretrepo.git"
- expectedBasicAuthHeader := fmt.Sprintf("Authorization: Basic %s", base64.StdEncoding.EncodeToString([]byte("user:pass!?@")))
- expectedHeader := fmt.Sprintf("http.extraHeader=%s", expectedBasicAuthHeader)
-
- args := cmd.Args()
- require.Contains(t, args, expectedScrubbedURL)
- require.Contains(t, args, expectedHeader)
- require.NotContains(t, args, userInfo)
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromURLExistingTarget)
}
-func TestFailedCreateRepositoryFromURLRequestDueToExistingTarget(t *testing.T) {
- t.Parallel()
+func testCreateRepositoryFromURLExistingTarget(t *testing.T, ctx context.Context) {
cfg, client := setupRepositoryServiceWithoutRepo(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
-
testCases := []struct {
desc string
repoPath string
@@ -126,17 +106,21 @@ func TestFailedCreateRepositoryFromURLRequestDueToExistingTarget(t *testing.T) {
}
_, err := client.CreateRepositoryFromURL(ctx, req)
- testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testhelper.RequireGrpcError(t, err, codes.AlreadyExists)
+ } else {
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+ }
})
}
}
-func TestPreventingRedirect(t *testing.T) {
- t.Parallel()
- cfg, client := setupRepositoryServiceWithoutRepo(t)
+func TestCreateRepositoryFromURL_redirect(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFromURLRedirect)
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testCreateRepositoryFromURLRedirect(t *testing.T, ctx context.Context) {
+ cfg, client := setupRepositoryServiceWithoutRepo(t)
importedRepo := &gitalypb.Repository{
RelativePath: "imports/test-repo-imported.git",
@@ -160,6 +144,30 @@ func TestPreventingRedirect(t *testing.T) {
require.Contains(t, err.Error(), "The requested URL returned error: 301")
}
+func TestCloneRepositoryFromUrlCommand(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ userInfo := "user:pass%21%3F%40"
+ repositoryFullPath := "full/path/to/repository"
+ url := fmt.Sprintf("https://%s@www.example.com/secretrepo.git", userInfo)
+
+ cfg := testcfg.Build(t)
+ s := server{cfg: cfg, gitCmdFactory: git.NewExecCommandFactory(cfg)}
+ cmd, err := s.cloneFromURLCommand(ctx, url, repositoryFullPath, git.WithDisabledHooks())
+ require.NoError(t, err)
+
+ expectedScrubbedURL := "https://www.example.com/secretrepo.git"
+ expectedBasicAuthHeader := fmt.Sprintf("Authorization: Basic %s", base64.StdEncoding.EncodeToString([]byte("user:pass!?@")))
+ expectedHeader := fmt.Sprintf("http.extraHeader=%s", expectedBasicAuthHeader)
+
+ args := cmd.Args()
+ require.Contains(t, args, expectedScrubbedURL)
+ require.Contains(t, args, expectedHeader)
+ require.NotContains(t, args, userInfo)
+}
+
func gitServerWithBasicAuth(t testing.TB, cfg config.Cfg, user, pass, repoPath string) (int, func() error) {
return gittest.GitServer(t, cfg, repoPath, basicAuthMiddleware(t, user, pass))
}
diff --git a/internal/gitaly/service/repository/create_test.go b/internal/gitaly/service/repository/create_repository_test.go
index 165d1257a..5039de10b 100644
--- a/internal/gitaly/service/repository/create_test.go
+++ b/internal/gitaly/service/repository/create_repository_test.go
@@ -17,7 +17,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/auth"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
@@ -25,27 +27,34 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
-func TestRepoNoAuth(t *testing.T) {
+func TestCreateRepository_missingAuth(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryMissingAuth)
+}
+
+func testCreateRepositoryMissingAuth(t *testing.T, ctx context.Context) {
cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{Auth: auth.Config{Token: "some"}}))
serverSocketPath := runRepositoryServerWithConfig(t, cfg, nil)
client := newRepositoryClient(t, config.Cfg{Auth: auth.Config{Token: ""}}, serverSocketPath)
- ctx, cancel := testhelper.Context()
- defer cancel()
-
_, err := client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
testhelper.RequireGrpcError(t, err, codes.Unauthenticated)
}
-func TestCreateRepositorySuccess(t *testing.T) {
- cfg, client := setupRepositoryServiceWithoutRepo(t)
+func TestCreateRepository_successful(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositorySuccessful)
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testCreateRepositorySuccessful(t *testing.T, ctx context.Context) {
+ cfg, client := setupRepositoryServiceWithoutRepo(t)
relativePath := "create-repository-test.git"
repoDir := filepath.Join(cfg.Storages[0].Path, relativePath)
@@ -73,11 +82,14 @@ func TestCreateRepositorySuccess(t *testing.T) {
require.Equal(t, symRef, []byte(fmt.Sprintf("ref: %s\n", git.DefaultRef)))
}
-func TestCreateRepositoryFailure(t *testing.T) {
- cfg, client := setupRepositoryServiceWithoutRepo(t)
+func TestCreateRepository_failure(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryFailure)
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testCreateRepositoryFailure(t *testing.T, ctx context.Context) {
+ cfg, client := setupRepositoryServiceWithoutRepo(t)
storagePath := cfg.Storages[0].Path
fullPath := filepath.Join(storagePath, "foo.git")
@@ -89,14 +101,21 @@ func TestCreateRepositoryFailure(t *testing.T) {
Repository: &gitalypb.Repository{StorageName: cfg.Storages[0].Name, RelativePath: "foo.git"},
})
- testhelper.RequireGrpcError(t, err, codes.Internal)
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testhelper.RequireGrpcError(t, err, codes.AlreadyExists)
+ } else {
+ testhelper.RequireGrpcError(t, err, codes.Internal)
+ }
}
-func TestCreateRepositoryFailureInvalidArgs(t *testing.T) {
- _, client := setupRepositoryServiceWithoutRepo(t)
+func TestCreateRepository_invalidArguments(t *testing.T) {
+ t.Parallel()
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryInvalidArguments)
+}
+
+func testCreateRepositoryInvalidArguments(t *testing.T, ctx context.Context) {
+ _, client := setupRepositoryServiceWithoutRepo(t)
testCases := []struct {
repo *gitalypb.Repository
@@ -118,7 +137,13 @@ func TestCreateRepositoryFailureInvalidArgs(t *testing.T) {
}
}
-func TestCreateRepositoryTransactional(t *testing.T) {
+func TestCreateRepository_transactional(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryTransactional)
+}
+
+func testCreateRepositoryTransactional(t *testing.T, ctx context.Context) {
var actualVote voting.Vote
var called int
@@ -132,9 +157,6 @@ func TestCreateRepositoryTransactional(t *testing.T) {
cfg, client := setupRepositoryServiceWithoutRepo(t, testserver.WithTransactionManager(&mockTxManager))
- ctx, cancel := testhelper.Context()
- defer cancel()
-
ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
require.NoError(t, err)
ctx = metadata.IncomingToOutgoing(ctx)
@@ -152,8 +174,12 @@ func TestCreateRepositoryTransactional(t *testing.T) {
require.NoError(t, err)
require.DirExists(t, filepath.Join(cfg.Storages[0].Path, "repo.git"))
- require.Equal(t, 1, called, "expected transactional vote")
- require.Equal(t, voting.VoteFromData([]byte{}), actualVote)
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ require.Equal(t, 2, called, "expected transactional vote")
+ } else {
+ require.Equal(t, 1, called, "expected transactional vote")
+ require.Equal(t, voting.VoteFromData([]byte{}), actualVote)
+ }
})
t.Run("idempotent creation with preexisting refs", func(t *testing.T) {
@@ -165,6 +191,12 @@ func TestCreateRepositoryTransactional(t *testing.T) {
_, err = client.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
Repository: repo,
})
+
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testassert.ProtoEqual(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err)
+ return
+ }
+
require.NoError(t, err)
refs := gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")
@@ -175,16 +207,25 @@ func TestCreateRepositoryTransactional(t *testing.T) {
})
}
-func TestCreateRepositoryIdempotent(t *testing.T) {
- cfg, repo, repoPath, client := setupRepositoryService(t)
+func TestCreateRepository_idempotent(t *testing.T) {
+ t.Parallel()
+
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testCreateRepositoryIdempotent)
+}
- ctx, cancel := testhelper.Context()
- defer cancel()
+func testCreateRepositoryIdempotent(t *testing.T, ctx context.Context) {
+ cfg, repo, repoPath, client := setupRepositoryService(t)
refsBefore := strings.Split(string(gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")), "\n")
req := &gitalypb.CreateRepositoryRequest{Repository: repo}
_, err := client.CreateRepository(ctx, req)
+
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ testassert.ProtoEqual(t, status.Error(codes.AlreadyExists, "creating repository: repository exists already"), err)
+ return
+ }
+
require.NoError(t, err)
refsAfter := strings.Split(string(gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")), "\n")
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go
index 7a54fe38a..eeb4667a9 100644
--- a/internal/gitaly/service/repository/replicate.go
+++ b/internal/gitaly/service/repository/replicate.go
@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/safe"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -141,6 +142,20 @@ func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryReq
}
func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error {
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ if err := s.createRepository(ctx, in.GetRepository(), func(repo *gitalypb.Repository) error {
+ if err := s.extractSnapshot(ctx, in.GetSource(), repo); err != nil {
+ return fmt.Errorf("extracting snapshot: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("creating repository: %w", err)
+ }
+
+ return nil
+ }
+
tempRepo, tempDir, err := tempdir.NewRepository(ctx, in.GetRepository().GetStorageName(), s.locator)
if err != nil {
return fmt.Errorf("create temporary directory: %w", err)
@@ -152,12 +167,33 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
return fmt.Errorf("create repository: %w", err)
}
- repoClient, err := s.newRepoClient(ctx, in.GetSource().GetStorageName())
+ if err := s.extractSnapshot(ctx, in.GetSource(), tempRepo); err != nil {
+ return fmt.Errorf("extracting snapshot: %w", err)
+ }
+
+ targetPath, err := s.locator.GetPath(in.GetRepository())
+ if err != nil {
+ return fmt.Errorf("locate repository: %w", err)
+ }
+
+ if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
+ return fmt.Errorf("create parent directories: %w", err)
+ }
+
+ if err := os.Rename(tempDir.Path(), targetPath); err != nil {
+ return fmt.Errorf("move temporary directory to target path: %w", err)
+ }
+
+ return nil
+}
+
+func (s *server) extractSnapshot(ctx context.Context, source, target *gitalypb.Repository) error {
+ repoClient, err := s.newRepoClient(ctx, source.GetStorageName())
if err != nil {
return fmt.Errorf("new client: %w", err)
}
- stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: in.GetSource()})
+ stream, err := repoClient.GetSnapshot(ctx, &gitalypb.GetSnapshotRequest{Repository: source})
if err != nil {
return fmt.Errorf("get snapshot: %w", err)
}
@@ -187,8 +223,13 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
}),
)
+ targetPath, err := s.locator.GetPath(target)
+ if err != nil {
+ return fmt.Errorf("target path: %w", err)
+ }
+
stderr := &bytes.Buffer{}
- cmd, err := command.New(ctx, exec.Command("tar", "-C", tempDir.Path(), "-xvf", "-"), snapshotReader, nil, stderr)
+ cmd, err := command.New(ctx, exec.Command("tar", "-C", targetPath, "-xvf", "-"), snapshotReader, nil, stderr)
if err != nil {
return fmt.Errorf("create tar command: %w", err)
}
@@ -197,19 +238,6 @@ func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateR
return fmt.Errorf("wait for tar, stderr: %q, err: %w", stderr, err)
}
- targetPath, err := s.locator.GetPath(in.GetRepository())
- if err != nil {
- return fmt.Errorf("locate repository: %w", err)
- }
-
- if err = os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
- return fmt.Errorf("create parent directories: %w", err)
- }
-
- if err := os.Rename(tempDir.Path(), targetPath); err != nil {
- return fmt.Errorf("move temporary directory to target path: %w", err)
- }
-
return nil
}
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 43aaa23ca..090ec68bf 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -2,6 +2,7 @@ package repository
import (
"bytes"
+ "context"
"os"
"path/filepath"
"sync/atomic"
@@ -14,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
@@ -27,6 +29,10 @@ import (
func TestReplicateRepository(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepository)
+}
+
+func testReplicateRepository(t *testing.T, ctx context.Context) {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica"))
cfg := cfgBuilder.Build(t)
@@ -61,8 +67,6 @@ func TestReplicateRepository(t *testing.T) {
targetRepo := proto.Clone(repo).(*gitalypb.Repository)
targetRepo.StorageName = cfg.Storages[1].Name
- ctx, cancel := testhelper.Context()
- defer cancel()
ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
_, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
@@ -101,9 +105,10 @@ func TestReplicateRepository(t *testing.T) {
func TestReplicateRepositoryTransactional(t *testing.T) {
t.Parallel()
- ctx, cancel := testhelper.Context()
- defer cancel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryTransactional)
+}
+func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica"))
cfg := cfgBuilder.Build(t)
@@ -149,7 +154,12 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
Source: sourceRepo,
})
require.NoError(t, err)
- require.EqualValues(t, 5, atomic.LoadInt32(&votes))
+
+ expectedVotes := 5
+ if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
+ expectedVotes++
+ }
+ require.EqualValues(t, expectedVotes, atomic.LoadInt32(&votes))
// We're now changing a reference in the source repository such that we can observe changes
// in the target repo.
@@ -168,6 +178,10 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
func TestReplicateRepositoryInvalidArguments(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryInvalidArguments)
+}
+
+func testReplicateRepositoryInvalidArguments(t *testing.T, ctx context.Context) {
testCases := []struct {
description string
input *gitalypb.ReplicateRepositoryRequest
@@ -241,9 +255,6 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) {
_, client := setupRepositoryServiceWithoutRepo(t)
- ctx, cancel := testhelper.Context()
- defer cancel()
-
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
_, err := client.ReplicateRepository(ctx, tc.input)
@@ -254,6 +265,10 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) {
func TestReplicateRepository_BadRepository(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryBadRepository)
+}
+
+func testReplicateRepositoryBadRepository(t *testing.T, ctx context.Context) {
for _, tc := range []struct {
desc string
invalidSource bool
@@ -316,8 +331,6 @@ func TestReplicateRepository_BadRepository(t *testing.T) {
}
}
- ctx, cancel := testhelper.Context()
- defer cancel()
ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
_, err := client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
@@ -338,6 +351,10 @@ func TestReplicateRepository_BadRepository(t *testing.T) {
func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryFailedFetchInternalRemote)
+}
+
+func testReplicateRepositoryFailedFetchInternalRemote(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t, testcfg.WithStorages("default", "replica"))
testcfg.BuildGitalyHooks(t, cfg)
testcfg.BuildGitalySSH(t, cfg)
@@ -361,9 +378,6 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) {
gittest.Exec(t, cfg, "init", "--bare", sourceRepoPath)
require.NoError(t, os.WriteFile(filepath.Join(sourceRepoPath, "HEAD"), []byte("garbage"), 0o666))
- ctx, cancel := testhelper.Context()
- defer cancel()
-
ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg))
repoClient := newRepositoryClient(t, cfg, cfg.SocketPath)
diff --git a/internal/gitaly/service/repository/util.go b/internal/gitaly/service/repository/util.go
index 64c8e6656..0712b4e20 100644
--- a/internal/gitaly/service/repository/util.go
+++ b/internal/gitaly/service/repository/util.go
@@ -1,10 +1,21 @@
package repository
import (
+ "bytes"
"context"
"fmt"
+ "io"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -19,3 +30,162 @@ func (s *server) removeOriginInRepo(ctx context.Context, repository *gitalypb.Re
return nil
}
+
+// createRepository will create a new repository in a race-free way with proper transactional
+// semantics. The repository will only be created if it doesn't yet exist and if nodes which take
+// part in the transaction reach quorum. Otherwise, the target path of the new repository will not
+// be modified. The repository can optionally be seeded with contents
+func (s *server) createRepository(
+ ctx context.Context,
+ repository *gitalypb.Repository,
+ seedRepository func(repository *gitalypb.Repository) error,
+) error {
+ targetPath, err := s.locator.GetPath(repository)
+ if err != nil {
+ return helper.ErrInvalidArgumentf("locate repository: %w", err)
+ }
+
+ // The repository must not exist on disk already, or otherwise we won't be able to
+ // create it with atomic semantics.
+ if _, err := os.Stat(targetPath); !os.IsNotExist(err) {
+ return helper.ErrAlreadyExistsf("repository exists already")
+ }
+
+ // Create the parent directory in case it doesn't exist yet.
+ if err := os.MkdirAll(filepath.Dir(targetPath), 0o770); err != nil {
+ return helper.ErrInternalf("create directories: %w", err)
+ }
+
+ newRepo, newRepoDir, err := tempdir.NewRepository(ctx, repository.GetStorageName(), s.locator)
+ if err != nil {
+ return fmt.Errorf("creating temporary repository: %w", err)
+ }
+ defer func() {
+ // We don't really care about whether this succeeds or not. It will either get
+ // cleaned up after the context is done, or eventually by the tempdir walker when
+ // it's old enough.
+ _ = os.RemoveAll(newRepoDir.Path())
+ }()
+
+ // Note that we do not create the repository directly in its target location, but
+ // instead create it in a temporary directory, first. This is done such that we can
+ // guarantee atomicity and roll back the change easily in case an error happens.
+ stderr := &bytes.Buffer{}
+ cmd, err := s.gitCmdFactory.NewWithoutRepo(ctx, git.SubCmd{
+ Name: "init",
+ Flags: []git.Option{
+ git.Flag{Name: "--bare"},
+ git.Flag{Name: "--quiet"},
+ },
+ Args: []string{newRepoDir.Path()},
+ }, git.WithStderr(stderr))
+ if err != nil {
+ return fmt.Errorf("spawning git-init: %w", err)
+ }
+ if err := cmd.Wait(); err != nil {
+ return fmt.Errorf("creating repository: %w, stderr: %q", err, stderr.String())
+ }
+
+ if err := seedRepository(newRepo); err != nil {
+ // Return the error returned by the callback function as-is so we don't clobber any
+ // potential returned gRPC error codes.
+ return err
+ }
+
+ // In order to guarantee that the repository is going to be the same across all
+ // Gitalies in case we're behind Praefect, we walk the repository and hash all of
+ // its files.
+ voteHash := voting.NewVoteHash()
+ if err := filepath.WalkDir(newRepoDir.Path(), func(path string, entry fs.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+
+ // The way packfiles are generated may not be deterministic, so we skip over the
+ // object database.
+ if path == filepath.Join(newRepoDir.Path(), "objects") {
+ return fs.SkipDir
+ }
+
+ // We do not care about directories.
+ if entry.IsDir() {
+ return nil
+ }
+
+ file, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("opening %q: %w", entry.Name(), err)
+ }
+ defer file.Close()
+
+ if _, err := io.Copy(voteHash, file); err != nil {
+ return fmt.Errorf("hashing %q: %w", entry.Name(), err)
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("walking repository: %w", err)
+ }
+
+ vote, err := voteHash.Vote()
+ if err != nil {
+ return fmt.Errorf("computing vote: %w", err)
+ }
+
+ // We're somewhat abusing this file writer given that we simply want to assert that
+ // the target directory doesn't exist and isn't created while we want to move the
+ // new repository into place. We thus only use the locking semantics of the writer,
+ // but will never commit it.
+ locker, err := safe.NewLockingFileWriter(targetPath)
+ if err != nil {
+ return fmt.Errorf("creating locker: %w", err)
+ }
+ defer func() {
+ if err := locker.Close(); err != nil {
+ ctxlogrus.Extract(ctx).Error("closing repository locker: %w", err)
+ }
+ }()
+
+ // We're now entering the critical section where we want to have exclusive access
+ // over creation of the repository. So we:
+ //
+ // 1. Lock the repository path such that no other process can create it at the same
+ // time.
+ // 2. Vote on the new repository's state.
+ // 3. Move the repository into place.
+ // 4. Do another confirmatory vote to signal that we performed the change.
+ // 5. Unlock the repository again.
+ //
+ // This sequence guarantees that the change is atomic and can trivially be rolled
+ // back in case we fail to either lock the repository or reach quorum in the initial
+ // vote.
+ if err := locker.Lock(); err != nil {
+ return fmt.Errorf("locking repository: %w", err)
+ }
+
+ // Now that the repository is locked, we must assert that it _still_ doesn't exist.
+ // Otherwise, it could have happened that a concurrent RPC call created it while we created
+ // and seeded our temporary repository. While we would notice this at the point of moving
+ // the repository into place, we want to be as sure as possible that the action will succeed
+ // previous to the first transactional vote.
+ if _, err := os.Stat(targetPath); !os.IsNotExist(err) {
+ return helper.ErrAlreadyExistsf("repository exists already")
+ }
+
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ return helper.ErrFailedPreconditionf("preparatory vote: %w", err)
+ }
+
+ // Now that we have locked the repository and all Gitalies have agreed that they
+ // want to do the same change we can move the repository into place.
+ if err := os.Rename(newRepoDir.Path(), targetPath); err != nil {
+ return fmt.Errorf("moving repository into place: %w", err)
+ }
+
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ return helper.ErrFailedPreconditionf("committing vote: %w", err)
+ }
+
+ // We unlock the repository implicitly via the deferred `Close()` call.
+ return nil
+}
diff --git a/internal/gitaly/service/repository/util_test.go b/internal/gitaly/service/repository/util_test.go
new file mode 100644
index 000000000..2e4602a63
--- /dev/null
+++ b/internal/gitaly/service/repository/util_test.go
@@ -0,0 +1,264 @@
+package repository
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc/peer"
+)
+
+func TestCreateRepository(t *testing.T) {
+ t.Parallel()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cfg := testcfg.Build(t)
+
+ txManager := &transaction.MockManager{}
+ locator := config.NewLocator(cfg)
+ gitCmdFactory := git.NewExecCommandFactory(cfg)
+
+ server := &server{
+ cfg: cfg,
+ locator: locator,
+ txManager: txManager,
+ gitCmdFactory: gitCmdFactory,
+ }
+
+ votes := 0
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T, repo *gitalypb.Repository, repoPath string)
+ seed func(t *testing.T, repo *gitalypb.Repository, repoPath string) error
+ verify func(
+ t *testing.T,
+ tempRepo *gitalypb.Repository,
+ tempRepoPath string,
+ realRepo *gitalypb.Repository,
+ realRepoPath string,
+ )
+ transactional bool
+ expectedErr error
+ }{
+ {
+ desc: "no seeding",
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ // Assert that the temporary repository does not exist anymore.
+ require.NoDirExists(t, tempRepoPath)
+
+ // But the new repository must exist.
+ isBareRepo := gittest.Exec(t, cfg, "-C", realRepoPath, "rev-parse", "--is-bare-repository")
+ require.Equal(t, "true", text.ChompBytes(isBareRepo))
+ },
+ },
+ {
+ desc: "seeding",
+ seed: func(t *testing.T, repo *gitalypb.Repository, _ string) error {
+ // We're using the command factory on purpose here to assert that we
+ // can execute regular Git commands on the temporary repository.
+ cmd, err := gitCmdFactory.New(ctx, repo, git.SubCmd{
+ Name: "config",
+ Args: []string{"custom.key", "value"},
+ })
+ require.NoError(t, err)
+ require.NoError(t, cmd.Wait())
+ return nil
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ value := gittest.Exec(t, cfg, "-C", realRepoPath, "config", "custom.key")
+ require.Equal(t, "value", text.ChompBytes(value))
+ },
+ },
+ {
+ desc: "error while seeding",
+ seed: func(t *testing.T, repo *gitalypb.Repository, _ string) error {
+ return errors.New("some error")
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, realRepoPath)
+ require.NoDirExists(t, tempRepoPath)
+ },
+ expectedErr: errors.New("some error"),
+ },
+ {
+ desc: "preexisting directory",
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ require.NoError(t, os.MkdirAll(repoPath, 0o777))
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, tempRepoPath)
+
+ require.DirExists(t, realRepoPath)
+ dirEntries, err := os.ReadDir(realRepoPath)
+ require.NoError(t, err)
+ require.Empty(t, dirEntries, "directory should not have been modified")
+ },
+ expectedErr: helper.ErrAlreadyExistsf("repository exists already"),
+ },
+ {
+ desc: "locked",
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ require.NoError(t, os.MkdirAll(filepath.Dir(repoPath), 0o777))
+
+ // Lock the target repository such that we must fail.
+ lock, err := os.Create(repoPath + ".lock")
+ require.NoError(t, err)
+ require.NoError(t, lock.Close())
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, tempRepoPath)
+ require.NoDirExists(t, realRepoPath)
+ require.FileExists(t, realRepoPath+".lock")
+ },
+ expectedErr: fmt.Errorf("locking repository: %w", safe.ErrFileAlreadyLocked),
+ },
+ {
+ desc: "successful transaction",
+ transactional: true,
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ votes = 0
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ votes++
+ return nil
+ }
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.Equal(t, 2, votes)
+ },
+ },
+ {
+ desc: "failing preparatory vote",
+ transactional: true,
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ return errors.New("vote failed")
+ }
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, tempRepoPath)
+ require.NoDirExists(t, realRepoPath)
+ },
+ expectedErr: helper.ErrFailedPreconditionf("preparatory vote: %w", errors.New("vote failed")),
+ },
+ {
+ desc: "failing post-commit vote",
+ transactional: true,
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ votes = 0
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ votes++
+ if votes == 1 {
+ return nil
+ }
+ return errors.New("vote failed")
+ }
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, tempRepoPath)
+
+ // The second vote is only a confirming vote that the node did the
+ // change. So if the second vote fails, then the change must have
+ // been performed and thus we'd see the repository.
+ require.DirExists(t, realRepoPath)
+ },
+ expectedErr: helper.ErrFailedPreconditionf("committing vote: %w", errors.New("vote failed")),
+ },
+ {
+ desc: "voting happens after lock",
+ transactional: true,
+ setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
+ // We both set up transactions and create the lock. Given that we
+ // should try locking the repository before casting any votes, we do
+ // not expect to see a voting error.
+
+ require.NoError(t, os.MkdirAll(filepath.Dir(repoPath), 0o777))
+ lock, err := os.Create(repoPath + ".lock")
+ require.NoError(t, err)
+ require.NoError(t, lock.Close())
+
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ require.FailNow(t, "no votes should have happened")
+ return nil
+ }
+ },
+ verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
+ require.NoDirExists(t, tempRepoPath)
+ require.NoDirExists(t, realRepoPath)
+ },
+ expectedErr: fmt.Errorf("locking repository: %w", errors.New("file already locked")),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ repo := &gitalypb.Repository{
+ StorageName: cfg.Storages[0].Name,
+ RelativePath: gittest.NewRepositoryName(t, true),
+ }
+
+ if tc.transactional {
+ var err error
+ ctx, err = txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
+ ctx = peer.NewContext(ctx, &peer.Peer{})
+ }
+
+ repoPath, err := locator.GetPath(repo)
+ require.NoError(t, err)
+
+ if tc.setup != nil {
+ tc.setup(t, repo, repoPath)
+ }
+
+ var tempRepo *gitalypb.Repository
+ require.Equal(t, tc.expectedErr, server.createRepository(ctx, repo, func(tr *gitalypb.Repository) error {
+ tempRepo = tr
+
+ // The temporary repository must have been created in Gitaly's
+ // temporary storage path.
+ require.Equal(t, repo.StorageName, tempRepo.StorageName)
+ require.True(t, strings.HasPrefix(tempRepo.RelativePath, "+gitaly/tmp/repo"))
+
+ // Verify that the temporary repository exists and is a real Git
+ // repository.
+ tempRepoPath, err := locator.GetRepoPath(tempRepo)
+ require.NoError(t, err)
+ isBareRepo := gittest.Exec(t, cfg, "-C", tempRepoPath, "rev-parse", "--is-bare-repository")
+ require.Equal(t, "true", text.ChompBytes(isBareRepo))
+
+ if tc.seed != nil {
+ return tc.seed(t, tempRepo, tempRepoPath)
+ }
+
+ return nil
+ }))
+
+ var tempRepoPath string
+ if tempRepo != nil {
+ tempRepoPath, err = locator.GetPath(tempRepo)
+ require.NoError(t, err)
+ }
+
+ require.NotNil(t, tc.verify, "test must verify results")
+ tc.verify(t, tempRepo, tempRepoPath, repo, repoPath)
+ })
+ }
+}
diff --git a/internal/helper/error.go b/internal/helper/error.go
index 6075866ae..81f25ddcc 100644
--- a/internal/helper/error.go
+++ b/internal/helper/error.go
@@ -93,6 +93,12 @@ func ErrPermissionDeniedf(format string, a ...interface{}) error {
return formatError(codes.PermissionDenied, format, a...)
}
+// ErrAlreadyExistsf wraps a formatted error with codes.AlreadyExists, unless the formatted error is
+// a wrapped gRPC error.
+func ErrAlreadyExistsf(format string, a ...interface{}) error {
+ return formatError(codes.AlreadyExists, format, a...)
+}
+
// formatError will create a new error from the given format string. If the error string contains a
// %w verb and its corresponding error has a gRPC error code, then the returned error will keep this
// gRPC error code instead of using the one provided as an argument.
diff --git a/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go b/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go
new file mode 100644
index 000000000..456f7da0e
--- /dev/null
+++ b/internal/metadata/featureflag/ff_tx_atomic_repository_creation.go
@@ -0,0 +1,8 @@
+package featureflag
+
+// TxAtomicRepositoryCreation will switch CreateRepository, CreateRepositoryFromBundle,
+// CreateRepositoryFromSnapshot and CreateRepositoryFromURL to have proper transactional guarantees.
+// This changes behaviour such the target repository must not exist previous to the call, creation
+// and seeding of the repository is done in a temporary staging area and then moved into place only
+// if no other RPC call created it concurrently.
+var TxAtomicRepositoryCreation = NewFeatureFlag("tx_atomic_repository_creation", false)
diff --git a/internal/safe/locking_file_writer.go b/internal/safe/locking_file_writer.go
index b7e8405c2..c43720cbe 100644
--- a/internal/safe/locking_file_writer.go
+++ b/internal/safe/locking_file_writer.go
@@ -1,6 +1,7 @@
package safe
import (
+ "errors"
"fmt"
"io"
"os"
@@ -14,6 +15,10 @@ const (
lockingFileWriterStateClosed
)
+// ErrFileAlreadyLocked is returned when trying to lock a file which has already been locked by
+// another concurrent process.
+var ErrFileAlreadyLocked = errors.New("file already locked")
+
// LockingFileWriter is a FileWriter which locks the target file on commit and checks whether it
// has been modified since the LockingFileWriter has been created. The user must first create a new
// LockingFileWriter via `NewLockingFileWriter()`, at which point it is open for writes. The writer
@@ -134,7 +139,7 @@ func (fw *LockingFileWriter) Lock() error {
lock, err := os.OpenFile(fw.lockPath(), os.O_CREATE|os.O_EXCL|os.O_RDONLY, 0o400)
if err != nil {
if os.IsExist(err) {
- return fmt.Errorf("file already locked")
+ return ErrFileAlreadyLocked
}
return fmt.Errorf("creating lock file: %w", err)
diff --git a/internal/safe/locking_file_writer_test.go b/internal/safe/locking_file_writer_test.go
index 152c63d94..7632a9d69 100644
--- a/internal/safe/locking_file_writer_test.go
+++ b/internal/safe/locking_file_writer_test.go
@@ -261,7 +261,7 @@ func TestLockingFileWriter_concurrentLocking(t *testing.T) {
require.NoError(t, err)
require.NoError(t, first.Lock())
- require.Equal(t, fmt.Errorf("file already locked"), second.Lock())
+ require.Equal(t, safe.ErrFileAlreadyLocked, second.Lock())
require.NoError(t, first.Commit())
require.Equal(t, []byte("first"), testhelper.MustReadFile(t, file))
@@ -279,7 +279,7 @@ func TestLockingFileWriter_locked(t *testing.T) {
// Concurrently lock the file.
require.NoError(t, os.WriteFile(target+".lock", nil, 0o644))
- require.Equal(t, fmt.Errorf("file already locked"), writer.Lock())
+ require.Equal(t, safe.ErrFileAlreadyLocked, writer.Lock())
require.Equal(t, []byte("base"), testhelper.MustReadFile(t, target))
}