diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-16 07:50:33 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-17 13:22:04 +0300 |
commit | a65add3bc5b6a0595c6406651965a9093312eb9f (patch) | |
tree | d45b30e133f208b107d14a3ad01c012da8fec8a9 | |
parent | d927c552ad4de29e14003a2b797f07750c80c76d (diff) |
repository: Convert `ReplicateRepository()` to write files transactionally
When replicating repositories, we need to both synchronize the gitconfig
and and the gitattributes files. Both are updated in non-transactional
way and can thus easily race with concurrent updates of the same
file.
Convert both sites to use locking file writes and transactional votes.
Changelog: fixed
-rw-r--r-- | internal/gitaly/service/repository/replicate.go | 58 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 21 |
2 files changed, 59 insertions, 20 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index b4bcaf676..3892d2203 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -15,7 +15,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/command" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/safe" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -226,7 +228,7 @@ func (s *server) syncGitconfig(ctx context.Context, in *gitalypb.ReplicateReposi } configPath := filepath.Join(repoPath, "config") - if err := writeFile(configPath, 0o644, streamio.NewReader(func() ([]byte, error) { + if err := s.writeFile(ctx, configPath, 0o644, streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetData(), err })); err != nil { @@ -260,7 +262,7 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR } attributesPath := filepath.Join(repoPath, "info", "attributes") - if err := writeFile(attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) { + if err := s.writeFile(ctx, attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) { resp, err := stream.Recv() return resp.GetAttributes(), err })); err != nil { @@ -270,28 +272,52 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR return nil } -func writeFile(path string, mode os.FileMode, reader io.Reader) error { +func (s *server) writeFile(ctx context.Context, path string, mode os.FileMode, reader io.Reader) (returnedErr error) { parentDir := filepath.Dir(path) if err := os.MkdirAll(parentDir, 0o755); err != nil { return err } - fw, err := safe.NewFileWriter(path) - if err != nil { - return err - } - defer fw.Close() + if featureflag.TxExtendedFileLocking.IsEnabled(ctx) { + lockedFile, err := safe.NewLockingFileWriter(path, safe.LockingFileWriterConfig{ + FileWriterConfig: safe.FileWriterConfig{ + FileMode: mode, + }, + }) + if err != nil { + return fmt.Errorf("creating file writer: %w", err) + } + defer func() { + if err := lockedFile.Close(); err != nil && returnedErr == nil { + returnedErr = fmt.Errorf("closing file writer: %w", err) + } + }() - if _, err := io.Copy(fw, reader); err != nil { - return err - } + if _, err := io.Copy(lockedFile, reader); err != nil { + return fmt.Errorf("writing contents: %w", err) + } - if err = fw.Commit(); err != nil { - return err - } + if err := transaction.CommitLockedFile(ctx, s.txManager, lockedFile); err != nil { + return fmt.Errorf("committing file: %w", err) + } + } else { + fw, err := safe.NewFileWriter(path) + if err != nil { + return err + } + defer fw.Close() - if err := os.Chmod(path, mode); err != nil { - return err + if _, err := io.Copy(fw, reader); err != nil { + return err + } + + if err = fw.Commit(); err != nil { + return err + } + + if err := os.Chmod(path, mode); err != nil { + return err + } } return nil diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 382b214aa..742dcf753 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -98,6 +99,12 @@ func TestReplicateRepository(t *testing.T) { } func TestReplicateRepositoryTransactional(t *testing.T) { + testhelper.NewFeatureSets([]featureflag.FeatureFlag{ + featureflag.TxExtendedFileLocking, + }).Run(t, testReplicateRepositoryTransactional) +} + +func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -122,8 +129,6 @@ func TestReplicateRepositoryTransactional(t *testing.T) { }, } - ctx, cancel := testhelper.Context() - defer cancel() ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true) require.NoError(t, err) ctx = helper.IncomingToOutgoing(ctx) @@ -146,7 +151,11 @@ func TestReplicateRepositoryTransactional(t *testing.T) { }) require.NoError(t, err) - require.Equal(t, 1, votes) + if featureflag.TxExtendedFileLocking.IsEnabled(ctx) { + require.Equal(t, 5, votes) + } else { + require.Equal(t, 1, votes) + } // We're now changing a reference in the source repository such that we can observe changes // in the target repo. @@ -161,7 +170,11 @@ func TestReplicateRepositoryTransactional(t *testing.T) { }) require.NoError(t, err) - require.Equal(t, 2, votes) + if featureflag.TxExtendedFileLocking.IsEnabled(ctx) { + require.Equal(t, 6, votes) + } else { + require.Equal(t, 2, votes) + } } func TestReplicateRepositoryInvalidArguments(t *testing.T) { |