diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-03 09:59:04 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-11-23 16:13:49 +0300 |
commit | f3ffcfca7916c856298441b7a77512e24d72d0b0 (patch) | |
tree | 01a6752599bf33993f7cd20c9b34a05d3a366995 | |
parent | b681abde2b0dfb945ecc0b02fa5ac9ff86d0b9ef (diff) |
repository: Convert `ReplicateRepository()` to atomically create repos
When the target repository for `ReplicateRepository()` doesn't yet
exist, then we will create it by first initializing an empty repository
and then fetching and extracting an archive of the source repository.
This currently happens in a non-atomic way, so if any other RPC call
concurrently modifies the target repository then the end state is
indeterminate.
Now that we have a central helper function which knows to create
repositories atomically, convert `ReplicateRepository()` to use this
new helper. This change makes the case where no repository exists yet
completely atomic: we either end up with the target repository in place
if the RPC call finished, or alternatively it will not have been created
at all.
Changelog: fixed
-rw-r--r-- | internal/gitaly/service/repository/replicate.go | 15 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 40 |
2 files changed, 42 insertions, 13 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 2914dcc7d..eeb4667a9 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/safe" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -141,6 +142,20 @@ func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryReq } func (s *server) createFromSnapshot(ctx context.Context, in *gitalypb.ReplicateRepositoryRequest) error { + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + if err := s.createRepository(ctx, in.GetRepository(), func(repo *gitalypb.Repository) error { + if err := s.extractSnapshot(ctx, in.GetSource(), repo); err != nil { + return fmt.Errorf("extracting snapshot: %w", err) + } + + return nil + }); err != nil { + return fmt.Errorf("creating repository: %w", err) + } + + return nil + } + tempRepo, tempDir, err := tempdir.NewRepository(ctx, in.GetRepository().GetStorageName(), s.locator) if err != nil { return fmt.Errorf("create temporary directory: %w", err) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 43aaa23ca..090ec68bf 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -2,6 +2,7 @@ package repository import ( "bytes" + "context" "os" "path/filepath" "sync/atomic" @@ -14,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" @@ -27,6 +29,10 @@ import ( func TestReplicateRepository(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepository) +} + +func testReplicateRepository(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -61,8 +67,6 @@ func TestReplicateRepository(t *testing.T) { targetRepo := proto.Clone(repo).(*gitalypb.Repository) targetRepo.StorageName = cfg.Storages[1].Name - ctx, cancel := testhelper.Context() - defer cancel() ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) _, err = client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -101,9 +105,10 @@ func TestReplicateRepository(t *testing.T) { func TestReplicateRepositoryTransactional(t *testing.T) { t.Parallel() - ctx, cancel := testhelper.Context() - defer cancel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryTransactional) +} +func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -149,7 +154,12 @@ func TestReplicateRepositoryTransactional(t *testing.T) { Source: sourceRepo, }) require.NoError(t, err) - require.EqualValues(t, 5, atomic.LoadInt32(&votes)) + + expectedVotes := 5 + if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) { + expectedVotes++ + } + require.EqualValues(t, expectedVotes, atomic.LoadInt32(&votes)) // We're now changing a reference in the source repository such that we can observe changes // in the target repo. @@ -168,6 +178,10 @@ func TestReplicateRepositoryTransactional(t *testing.T) { func TestReplicateRepositoryInvalidArguments(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryInvalidArguments) +} + +func testReplicateRepositoryInvalidArguments(t *testing.T, ctx context.Context) { testCases := []struct { description string input *gitalypb.ReplicateRepositoryRequest @@ -241,9 +255,6 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { _, client := setupRepositoryServiceWithoutRepo(t) - ctx, cancel := testhelper.Context() - defer cancel() - for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { _, err := client.ReplicateRepository(ctx, tc.input) @@ -254,6 +265,10 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { func TestReplicateRepository_BadRepository(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryBadRepository) +} + +func testReplicateRepositoryBadRepository(t *testing.T, ctx context.Context) { for _, tc := range []struct { desc string invalidSource bool @@ -316,8 +331,6 @@ func TestReplicateRepository_BadRepository(t *testing.T) { } } - ctx, cancel := testhelper.Context() - defer cancel() ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) _, err := client.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -338,6 +351,10 @@ func TestReplicateRepository_BadRepository(t *testing.T) { func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TxAtomicRepositoryCreation).Run(t, testReplicateRepositoryFailedFetchInternalRemote) +} + +func testReplicateRepositoryFailedFetchInternalRemote(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t, testcfg.WithStorages("default", "replica")) testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) @@ -361,9 +378,6 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { gittest.Exec(t, cfg, "init", "--bare", sourceRepoPath) require.NoError(t, os.WriteFile(filepath.Join(sourceRepoPath, "HEAD"), []byte("garbage"), 0o666)) - ctx, cancel := testhelper.Context() - defer cancel() - ctx = testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) repoClient := newRepositoryClient(t, cfg, cfg.SocketPath) |