Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-16 07:50:33 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-17 13:22:04 +0300
commita65add3bc5b6a0595c6406651965a9093312eb9f (patch)
treed45b30e133f208b107d14a3ad01c012da8fec8a9
parentd927c552ad4de29e14003a2b797f07750c80c76d (diff)
repository: Convert `ReplicateRepository()` to write files transactionally
When replicating repositories, we need to both synchronize the gitconfig and and the gitattributes files. Both are updated in non-transactional way and can thus easily race with concurrent updates of the same file. Convert both sites to use locking file writes and transactional votes. Changelog: fixed
-rw-r--r--internal/gitaly/service/repository/replicate.go58
-rw-r--r--internal/gitaly/service/repository/replicate_test.go21
2 files changed, 59 insertions, 20 deletions
diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go
index b4bcaf676..3892d2203 100644
--- a/internal/gitaly/service/repository/replicate.go
+++ b/internal/gitaly/service/repository/replicate.go
@@ -15,7 +15,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/remote"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/safe"
"gitlab.com/gitlab-org/gitaly/v14/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -226,7 +228,7 @@ func (s *server) syncGitconfig(ctx context.Context, in *gitalypb.ReplicateReposi
}
configPath := filepath.Join(repoPath, "config")
- if err := writeFile(configPath, 0o644, streamio.NewReader(func() ([]byte, error) {
+ if err := s.writeFile(ctx, configPath, 0o644, streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})); err != nil {
@@ -260,7 +262,7 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR
}
attributesPath := filepath.Join(repoPath, "info", "attributes")
- if err := writeFile(attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) {
+ if err := s.writeFile(ctx, attributesPath, attributesFileMode, streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetAttributes(), err
})); err != nil {
@@ -270,28 +272,52 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR
return nil
}
-func writeFile(path string, mode os.FileMode, reader io.Reader) error {
+func (s *server) writeFile(ctx context.Context, path string, mode os.FileMode, reader io.Reader) (returnedErr error) {
parentDir := filepath.Dir(path)
if err := os.MkdirAll(parentDir, 0o755); err != nil {
return err
}
- fw, err := safe.NewFileWriter(path)
- if err != nil {
- return err
- }
- defer fw.Close()
+ if featureflag.TxExtendedFileLocking.IsEnabled(ctx) {
+ lockedFile, err := safe.NewLockingFileWriter(path, safe.LockingFileWriterConfig{
+ FileWriterConfig: safe.FileWriterConfig{
+ FileMode: mode,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("creating file writer: %w", err)
+ }
+ defer func() {
+ if err := lockedFile.Close(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("closing file writer: %w", err)
+ }
+ }()
- if _, err := io.Copy(fw, reader); err != nil {
- return err
- }
+ if _, err := io.Copy(lockedFile, reader); err != nil {
+ return fmt.Errorf("writing contents: %w", err)
+ }
- if err = fw.Commit(); err != nil {
- return err
- }
+ if err := transaction.CommitLockedFile(ctx, s.txManager, lockedFile); err != nil {
+ return fmt.Errorf("committing file: %w", err)
+ }
+ } else {
+ fw, err := safe.NewFileWriter(path)
+ if err != nil {
+ return err
+ }
+ defer fw.Close()
- if err := os.Chmod(path, mode); err != nil {
- return err
+ if _, err := io.Copy(fw, reader); err != nil {
+ return err
+ }
+
+ if err = fw.Commit(); err != nil {
+ return err
+ }
+
+ if err := os.Chmod(path, mode); err != nil {
+ return err
+ }
}
return nil
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 382b214aa..742dcf753 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
@@ -98,6 +99,12 @@ func TestReplicateRepository(t *testing.T) {
}
func TestReplicateRepositoryTransactional(t *testing.T) {
+ testhelper.NewFeatureSets([]featureflag.FeatureFlag{
+ featureflag.TxExtendedFileLocking,
+ }).Run(t, testReplicateRepositoryTransactional)
+}
+
+func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica"))
cfg := cfgBuilder.Build(t)
@@ -122,8 +129,6 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
},
}
- ctx, cancel := testhelper.Context()
- defer cancel()
ctx, err := txinfo.InjectTransaction(ctx, 1, "primary", true)
require.NoError(t, err)
ctx = helper.IncomingToOutgoing(ctx)
@@ -146,7 +151,11 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
})
require.NoError(t, err)
- require.Equal(t, 1, votes)
+ if featureflag.TxExtendedFileLocking.IsEnabled(ctx) {
+ require.Equal(t, 5, votes)
+ } else {
+ require.Equal(t, 1, votes)
+ }
// We're now changing a reference in the source repository such that we can observe changes
// in the target repo.
@@ -161,7 +170,11 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
})
require.NoError(t, err)
- require.Equal(t, 2, votes)
+ if featureflag.TxExtendedFileLocking.IsEnabled(ctx) {
+ require.Equal(t, 6, votes)
+ } else {
+ require.Equal(t, 2, votes)
+ }
}
func TestReplicateRepositoryInvalidArguments(t *testing.T) {