diff options
author | Toon Claes <toon@gitlab.com> | 2022-04-25 13:09:55 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2022-04-25 13:09:55 +0300 |
commit | 953a7b80beb340565b30dce713747120552c23e5 (patch) | |
tree | 3ca5c2f119cd088b884cc627a573d265e5f256a9 | |
parent | f57a981311cf48840ffdc14dce1743e7e3cf0768 (diff) | |
parent | e66d0d5107a5e3a56259e3bdfc2617d24a920a94 (diff) |
Merge branch 'jc-restore-custom-hooks-vote' into 'master'
repository: RestoreCustomHooks to do transaction voting
Closes #4081
See merge request gitlab-org/gitaly!4481
-rw-r--r-- | internal/backup/backup_test.go | 8 | ||||
-rw-r--r-- | internal/gitaly/service/repository/restore_custom_hooks.go | 136 | ||||
-rw-r--r-- | internal/gitaly/service/repository/restore_custom_hooks_test.go | 81 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go | 5 | ||||
-rw-r--r-- | internal/safe/locking_directory.go | 95 | ||||
-rw-r--r-- | internal/safe/locking_directory_test.go | 78 |
6 files changed, 396 insertions, 7 deletions
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index cd577b6a3..d47221f6a 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -1,6 +1,7 @@ package backup import ( + "context" "fmt" "os" "path/filepath" @@ -12,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -264,7 +266,11 @@ func TestManager_Create_incremental(t *testing.T) { func TestManager_Restore(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testManagerRestore) +} + +func testManagerRestore(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) testcfg.BuildGitalyHooks(t, cfg) diff --git a/internal/gitaly/service/repository/restore_custom_hooks.go b/internal/gitaly/service/repository/restore_custom_hooks.go index 0b6971717..bb6865049 100644 --- a/internal/gitaly/service/repository/restore_custom_hooks.go +++ b/internal/gitaly/service/repository/restore_custom_hooks.go @@ -1,9 +1,21 @@ package repository import ( + "context" + "errors" + "fmt" + "os" "os/exec" + "path/filepath" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v14/internal/command" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v14/internal/safe" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "google.golang.org/grpc/codes" @@ -11,6 +23,10 @@ import ( ) func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error { + if featureflag.TransactionalRestoreCustomHooks.IsEnabled(stream.Context()) { + return s.restoreCustomHooksWithVoting(stream) + } + firstRequest, err := stream.Recv() if err != nil { return status.Errorf(codes.Internal, "RestoreCustomHooks: first request failed %v", err) @@ -57,3 +73,123 @@ func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCus return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{}) } + +func (s *server) restoreCustomHooksWithVoting(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error { + firstRequest, err := stream.Recv() + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: first request failed %w", err) + } + + ctx := stream.Context() + + repo := firstRequest.GetRepository() + if repo == nil { + return helper.ErrInvalidArgumentf("RestoreCustomHooks: empty Repository") + } + + v := voting.NewVoteHash() + + repoPath, err := s.locator.GetRepoPath(repo) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: getting repo path failed %w", err) + } + + customHooksPath := filepath.Join(repoPath, customHooksDir) + + if err = os.MkdirAll(customHooksPath, os.ModePerm); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: making custom hooks directory %w", err) + } + + lockDir, err := safe.NewLockingDirectory(customHooksPath) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: creating locking directory: %w", err) + } + + if err := lockDir.Lock(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: locking directory failed: %w", err) + } + + defer func() { + if !lockDir.IsLocked() { + return + } + + if err := lockDir.Unlock(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Warn("could not unlock directory") + } + }() + + if err := voteCustomHooks(ctx, s.txManager, &v, voting.Prepared); err != nil { + return err + } + + reader := streamio.NewReader(func() ([]byte, error) { + var data []byte + defer func() { + _, _ = v.Write(data) + }() + + if firstRequest != nil { + data = firstRequest.GetData() + firstRequest = nil + return data, nil + } + + request, err := stream.Recv() + + data = request.GetData() + return data, err + }) + + cmdArgs := []string{ + "-xf", + "-", + "-C", + repoPath, + customHooksDir, + } + + cmd, err := command.New(ctx, exec.Command("tar", cmdArgs...), reader, nil, nil) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: Could not untar custom hooks tar %w", err) + } + + if err := cmd.Wait(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: cmd wait failed: %w", err) + } + + if err := voteCustomHooks(ctx, s.txManager, &v, voting.Committed); err != nil { + return err + } + + if err := lockDir.Unlock(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: committing lock dir %w", err) + } + + return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{}) +} + +func voteCustomHooks( + ctx context.Context, + txManager transaction.Manager, + v *voting.VoteHash, + phase voting.Phase, +) error { + tx, err := txinfo.TransactionFromContext(ctx) + if errors.Is(err, txinfo.ErrTransactionNotFound) { + return nil + } else if err != nil { + return err + } + + vote, err := v.Vote() + if err != nil { + return err + } + + if err := txManager.Vote(ctx, tx, vote, phase); err != nil { + return fmt.Errorf("vote failed: %w", err) + } + + return nil +} diff --git a/internal/gitaly/service/repository/restore_custom_hooks_test.go b/internal/gitaly/service/repository/restore_custom_hooks_test.go index 294b51c45..8c02e0b4c 100644 --- a/internal/gitaly/service/repository/restore_custom_hooks_test.go +++ b/internal/gitaly/service/repository/restore_custom_hooks_test.go @@ -1,29 +1,77 @@ package repository import ( + "context" "io" "os" "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/grpc" "google.golang.org/grpc/codes" ) -func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) { +func TestSuccessfulRestoreCustomHooksRequest(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testSuccessfulRestoreCustomHooksRequest) +} + +func testSuccessfulRestoreCustomHooksRequest(t *testing.T, ctx context.Context) { + t.Parallel() + + cfg := testcfg.Build(t) + testcfg.BuildGitalyHooks(t, cfg) + txManager := transaction.NewTrackingManager() + + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + deps.GetConnsPool(), + deps.GetGit2goExecutor(), + deps.GetHousekeepingManager(), + )) + }, testserver.WithTransactionManager(txManager)) + cfg.SocketPath = addr + + client := newRepositoryClient(t, cfg, addr) + + ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) - ctx := testhelper.Context(t) - _, repo, repoPath, client := setupRepositoryService(ctx, t) + ctx = metadata.IncomingToOutgoing(ctx) + repo, repoPath := gittest.CreateRepository(ctx, t, cfg) + // reset the txManager since CreateRepository would have done + // voting + txManager.Reset() stream, err := client.RestoreCustomHooks(ctx) require.NoError(t, err) request := &gitalypb.RestoreCustomHooksRequest{Repository: repo} + voteHash := voting.NewVoteHash() + writer := streamio.NewWriter(func(p []byte) error { + voteHash.Write(p) request.Data = p if err := stream.Send(request); err != nil { return err @@ -35,20 +83,38 @@ func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) { file, err := os.Open("testdata/custom_hooks.tar") require.NoError(t, err) - defer file.Close() _, err = io.Copy(writer, file) require.NoError(t, err) _, err = stream.CloseAndRecv() require.NoError(t, err) + testhelper.MustClose(t, file) + + expectedVote, err := voteHash.Vote() + require.NoError(t, err) + require.FileExists(t, filepath.Join(repoPath, "custom_hooks", "pre-push.sample")) + + if featureflag.TransactionalRestoreCustomHooks.IsEnabled(ctx) { + require.Equal(t, 2, len(txManager.Votes())) + assert.Equal(t, voting.Prepared, txManager.Votes()[0].Phase) + assert.Equal(t, expectedVote, txManager.Votes()[1].Vote) + assert.Equal(t, voting.Committed, txManager.Votes()[1].Phase) + } else { + require.Equal(t, 0, len(txManager.Votes())) + } } func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testFailedRestoreCustomHooksDueToValidations) +} + +func testFailedRestoreCustomHooksDueToValidations(t *testing.T, ctx context.Context) { + t.Parallel() _, client := setupRepositoryServiceWithoutRepo(t) - ctx := testhelper.Context(t) stream, err := client.RestoreCustomHooks(ctx) require.NoError(t, err) @@ -61,8 +127,11 @@ func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) { func TestFailedRestoreCustomHooksDueToBadTar(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testFailedRestoreCustomHooksDueToBadTar) +} - ctx := testhelper.Context(t) +func testFailedRestoreCustomHooksDueToBadTar(t *testing.T, ctx context.Context) { _, repo, _, client := setupRepositoryService(ctx, t) stream, err := client.RestoreCustomHooks(ctx) diff --git a/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go new file mode 100644 index 000000000..8f3e2337d --- /dev/null +++ b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go @@ -0,0 +1,5 @@ +package featureflag + +// TransactionalRestoreCustomHooks will use transactional voting in the +// RestoreCustomHooks RPC +var TransactionalRestoreCustomHooks = NewFeatureFlag("tx_restore_custom_hooks", false) diff --git a/internal/safe/locking_directory.go b/internal/safe/locking_directory.go new file mode 100644 index 000000000..b15483bce --- /dev/null +++ b/internal/safe/locking_directory.go @@ -0,0 +1,95 @@ +package safe + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" +) + +type lockingDirectoryState int + +const ( + lockingDirectoryStateUnlocked lockingDirectoryState = iota + lockingDirectoryStateLocked +) + +// LockingDirectory allows locking and unlocking a directory for safe access and +// modification. +type LockingDirectory struct { + state lockingDirectoryState + path string +} + +// NewLockingDirectory creates a new LockingDirectory. +func NewLockingDirectory(path string) (*LockingDirectory, error) { + fi, err := os.Stat(path) + if err != nil { + return nil, fmt.Errorf("creating new locking directory: %w", err) + } + + if !fi.IsDir() { + return nil, errors.New("not a directory") + } + + ld := &LockingDirectory{ + state: lockingDirectoryStateUnlocked, + path: path, + } + + return ld, nil +} + +// Lock locks the directory and prevents a second process with a +// LockingDirectory from also locking the directory. +func (ld *LockingDirectory) Lock() error { + if ld.state != lockingDirectoryStateUnlocked { + return errors.New("locking directory not lockable") + } + + lock, err := os.OpenFile(ld.lockPath(), os.O_CREATE|os.O_EXCL|os.O_RDONLY, 0o400) + if err != nil { + if os.IsExist(err) { + return ErrFileAlreadyLocked + } + + return fmt.Errorf("creating lock file: %w", err) + } + _ = lock.Close() + + ld.state = lockingDirectoryStateLocked + + return nil +} + +// IsLocked returns whether or not the directory has been locked. +func (ld *LockingDirectory) IsLocked() bool { + return ld.state == lockingDirectoryStateLocked +} + +// Unlock unlocks the directory. +func (ld *LockingDirectory) Unlock() error { + if ld.state != lockingDirectoryStateLocked { + return errors.New("locking directory not locked") + } + + if err := os.Remove(ld.lockPath()); err != nil { + // A previous call might have returned an error + // but still removed the file. + if errors.Is(err, fs.ErrNotExist) { + ld.state = lockingDirectoryStateUnlocked + return nil + } + + return fmt.Errorf("unlocking directory: %w", err) + } + + ld.state = lockingDirectoryStateUnlocked + + return nil +} + +func (ld *LockingDirectory) lockPath() string { + return filepath.Join(ld.path, ".lock") +} diff --git a/internal/safe/locking_directory_test.go b/internal/safe/locking_directory_test.go new file mode 100644 index 000000000..e9280c3dd --- /dev/null +++ b/internal/safe/locking_directory_test.go @@ -0,0 +1,78 @@ +package safe_test + +import ( + "errors" + "io/fs" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/safe" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestLockingDirectory(t *testing.T) { + t.Parallel() + + t.Run("normal lifecycle", func(t *testing.T) { + path := testhelper.TempDir(t) + lockingDir, err := safe.NewLockingDirectory(path) + require.NoError(t, err) + require.NoError(t, lockingDir.Lock()) + secondLockingDir, err := safe.NewLockingDirectory(path) + require.NoError(t, err) + require.NoError(t, os.WriteFile( + filepath.Join(path, "somefile"), + []byte("data"), + 0o644), + ) + assert.ErrorIs(t, secondLockingDir.Lock(), safe.ErrFileAlreadyLocked) + require.NoError(t, lockingDir.Unlock()) + }) + + t.Run("multiple locks fail", func(t *testing.T) { + path := testhelper.TempDir(t) + lockingDir, err := safe.NewLockingDirectory(path) + require.NoError(t, err) + require.NoError(t, lockingDir.Lock()) + assert.Equal( + t, + errors.New("locking directory not lockable"), + lockingDir.Lock(), + ) + }) + + t.Run("unlock without lock fails", func(t *testing.T) { + path := testhelper.TempDir(t) + lockingDir, err := safe.NewLockingDirectory(path) + require.NoError(t, err) + assert.Equal( + t, + errors.New("locking directory not locked"), + lockingDir.Unlock(), + ) + }) + + t.Run("multiple unlocks fail", func(t *testing.T) { + path := testhelper.TempDir(t) + lockingDir, err := safe.NewLockingDirectory(path) + require.NoError(t, err) + require.NoError(t, lockingDir.Lock()) + require.NoError(t, lockingDir.Unlock()) + assert.Equal( + t, + errors.New("locking directory not locked"), + lockingDir.Unlock(), + ) + }) + + t.Run("fails if directory is missing", func(t *testing.T) { + path := testhelper.TempDir(t) + require.NoError(t, os.RemoveAll(path)) + + _, err := safe.NewLockingDirectory(path) + assert.True(t, errors.Is(err, fs.ErrNotExist)) + }) +} |