diff options
author | John Cai <jcai@gitlab.com> | 2022-04-15 20:58:21 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-21 05:42:13 +0300 |
commit | e66d0d5107a5e3a56259e3bdfc2617d24a920a94 (patch) | |
tree | f3a50a0d72d442855fb66356a02407c8102af1b6 | |
parent | 1a7197e3b2171626a95fbab2b28ecbe43ea79aea (diff) |
repository: RestoreCustomHooks to do transaction votingjc-restore-custom-hooks-vote
RestoreCustomHooks changes data in a repository, so it should do voting.
Otherwise, each time we create a replication job for it, which is
useless because we don't replicate hooks.
Add transactional voting to the RestoreCustomHooks RPC by integrating
with the new LockingDirectory.
Changelog: changed
4 files changed, 223 insertions, 7 deletions
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index cd577b6a3..d47221f6a 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -1,6 +1,7 @@ package backup import ( + "context" "fmt" "os" "path/filepath" @@ -12,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -264,7 +266,11 @@ func TestManager_Create_incremental(t *testing.T) { func TestManager_Restore(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testManagerRestore) +} + +func testManagerRestore(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) testcfg.BuildGitalyHooks(t, cfg) diff --git a/internal/gitaly/service/repository/restore_custom_hooks.go b/internal/gitaly/service/repository/restore_custom_hooks.go index 0b6971717..bb6865049 100644 --- a/internal/gitaly/service/repository/restore_custom_hooks.go +++ b/internal/gitaly/service/repository/restore_custom_hooks.go @@ -1,9 +1,21 @@ package repository import ( + "context" + "errors" + "fmt" + "os" "os/exec" + "path/filepath" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v14/internal/command" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v14/internal/safe" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "google.golang.org/grpc/codes" @@ -11,6 +23,10 @@ import ( ) func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error { + if featureflag.TransactionalRestoreCustomHooks.IsEnabled(stream.Context()) { + return s.restoreCustomHooksWithVoting(stream) + } + firstRequest, err := stream.Recv() if err != nil { return status.Errorf(codes.Internal, "RestoreCustomHooks: first request failed %v", err) @@ -57,3 +73,123 @@ func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCus return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{}) } + +func (s *server) restoreCustomHooksWithVoting(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error { + firstRequest, err := stream.Recv() + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: first request failed %w", err) + } + + ctx := stream.Context() + + repo := firstRequest.GetRepository() + if repo == nil { + return helper.ErrInvalidArgumentf("RestoreCustomHooks: empty Repository") + } + + v := voting.NewVoteHash() + + repoPath, err := s.locator.GetRepoPath(repo) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: getting repo path failed %w", err) + } + + customHooksPath := filepath.Join(repoPath, customHooksDir) + + if err = os.MkdirAll(customHooksPath, os.ModePerm); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: making custom hooks directory %w", err) + } + + lockDir, err := safe.NewLockingDirectory(customHooksPath) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: creating locking directory: %w", err) + } + + if err := lockDir.Lock(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: locking directory failed: %w", err) + } + + defer func() { + if !lockDir.IsLocked() { + return + } + + if err := lockDir.Unlock(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Warn("could not unlock directory") + } + }() + + if err := voteCustomHooks(ctx, s.txManager, &v, voting.Prepared); err != nil { + return err + } + + reader := streamio.NewReader(func() ([]byte, error) { + var data []byte + defer func() { + _, _ = v.Write(data) + }() + + if firstRequest != nil { + data = firstRequest.GetData() + firstRequest = nil + return data, nil + } + + request, err := stream.Recv() + + data = request.GetData() + return data, err + }) + + cmdArgs := []string{ + "-xf", + "-", + "-C", + repoPath, + customHooksDir, + } + + cmd, err := command.New(ctx, exec.Command("tar", cmdArgs...), reader, nil, nil) + if err != nil { + return helper.ErrInternalf("RestoreCustomHooks: Could not untar custom hooks tar %w", err) + } + + if err := cmd.Wait(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: cmd wait failed: %w", err) + } + + if err := voteCustomHooks(ctx, s.txManager, &v, voting.Committed); err != nil { + return err + } + + if err := lockDir.Unlock(); err != nil { + return helper.ErrInternalf("RestoreCustomHooks: committing lock dir %w", err) + } + + return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{}) +} + +func voteCustomHooks( + ctx context.Context, + txManager transaction.Manager, + v *voting.VoteHash, + phase voting.Phase, +) error { + tx, err := txinfo.TransactionFromContext(ctx) + if errors.Is(err, txinfo.ErrTransactionNotFound) { + return nil + } else if err != nil { + return err + } + + vote, err := v.Vote() + if err != nil { + return err + } + + if err := txManager.Vote(ctx, tx, vote, phase); err != nil { + return fmt.Errorf("vote failed: %w", err) + } + + return nil +} diff --git a/internal/gitaly/service/repository/restore_custom_hooks_test.go b/internal/gitaly/service/repository/restore_custom_hooks_test.go index 294b51c45..8c02e0b4c 100644 --- a/internal/gitaly/service/repository/restore_custom_hooks_test.go +++ b/internal/gitaly/service/repository/restore_custom_hooks_test.go @@ -1,29 +1,77 @@ package repository import ( + "context" "io" "os" "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" + "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/grpc" "google.golang.org/grpc/codes" ) -func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) { +func TestSuccessfulRestoreCustomHooksRequest(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testSuccessfulRestoreCustomHooksRequest) +} + +func testSuccessfulRestoreCustomHooksRequest(t *testing.T, ctx context.Context) { + t.Parallel() + + cfg := testcfg.Build(t) + testcfg.BuildGitalyHooks(t, cfg) + txManager := transaction.NewTrackingManager() + + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + deps.GetConnsPool(), + deps.GetGit2goExecutor(), + deps.GetHousekeepingManager(), + )) + }, testserver.WithTransactionManager(txManager)) + cfg.SocketPath = addr + + client := newRepositoryClient(t, cfg, addr) + + ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true) + require.NoError(t, err) - ctx := testhelper.Context(t) - _, repo, repoPath, client := setupRepositoryService(ctx, t) + ctx = metadata.IncomingToOutgoing(ctx) + repo, repoPath := gittest.CreateRepository(ctx, t, cfg) + // reset the txManager since CreateRepository would have done + // voting + txManager.Reset() stream, err := client.RestoreCustomHooks(ctx) require.NoError(t, err) request := &gitalypb.RestoreCustomHooksRequest{Repository: repo} + voteHash := voting.NewVoteHash() + writer := streamio.NewWriter(func(p []byte) error { + voteHash.Write(p) request.Data = p if err := stream.Send(request); err != nil { return err @@ -35,20 +83,38 @@ func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) { file, err := os.Open("testdata/custom_hooks.tar") require.NoError(t, err) - defer file.Close() _, err = io.Copy(writer, file) require.NoError(t, err) _, err = stream.CloseAndRecv() require.NoError(t, err) + testhelper.MustClose(t, file) + + expectedVote, err := voteHash.Vote() + require.NoError(t, err) + require.FileExists(t, filepath.Join(repoPath, "custom_hooks", "pre-push.sample")) + + if featureflag.TransactionalRestoreCustomHooks.IsEnabled(ctx) { + require.Equal(t, 2, len(txManager.Votes())) + assert.Equal(t, voting.Prepared, txManager.Votes()[0].Phase) + assert.Equal(t, expectedVote, txManager.Votes()[1].Vote) + assert.Equal(t, voting.Committed, txManager.Votes()[1].Phase) + } else { + require.Equal(t, 0, len(txManager.Votes())) + } } func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testFailedRestoreCustomHooksDueToValidations) +} + +func testFailedRestoreCustomHooksDueToValidations(t *testing.T, ctx context.Context) { + t.Parallel() _, client := setupRepositoryServiceWithoutRepo(t) - ctx := testhelper.Context(t) stream, err := client.RestoreCustomHooks(ctx) require.NoError(t, err) @@ -61,8 +127,11 @@ func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) { func TestFailedRestoreCustomHooksDueToBadTar(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks). + Run(t, testFailedRestoreCustomHooksDueToBadTar) +} - ctx := testhelper.Context(t) +func testFailedRestoreCustomHooksDueToBadTar(t *testing.T, ctx context.Context) { _, repo, _, client := setupRepositoryService(ctx, t) stream, err := client.RestoreCustomHooks(ctx) diff --git a/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go new file mode 100644 index 000000000..8f3e2337d --- /dev/null +++ b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go @@ -0,0 +1,5 @@ +package featureflag + +// TransactionalRestoreCustomHooks will use transactional voting in the +// RestoreCustomHooks RPC +var TransactionalRestoreCustomHooks = NewFeatureFlag("tx_restore_custom_hooks", false) |