Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-04-15 20:58:21 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-21 05:42:13 +0300
commite66d0d5107a5e3a56259e3bdfc2617d24a920a94 (patch)
treef3a50a0d72d442855fb66356a02407c8102af1b6
parent1a7197e3b2171626a95fbab2b28ecbe43ea79aea (diff)
repository: RestoreCustomHooks to do transaction votingjc-restore-custom-hooks-vote
RestoreCustomHooks changes data in a repository, so it should do voting. Otherwise, each time we create a replication job for it, which is useless because we don't replicate hooks. Add transactional voting to the RestoreCustomHooks RPC by integrating with the new LockingDirectory. Changelog: changed
-rw-r--r--internal/backup/backup_test.go8
-rw-r--r--internal/gitaly/service/repository/restore_custom_hooks.go136
-rw-r--r--internal/gitaly/service/repository/restore_custom_hooks_test.go81
-rw-r--r--internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go5
4 files changed, 223 insertions, 7 deletions
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index cd577b6a3..d47221f6a 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -1,6 +1,7 @@
package backup
import (
+ "context"
"fmt"
"os"
"path/filepath"
@@ -12,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
@@ -264,7 +266,11 @@ func TestManager_Create_incremental(t *testing.T) {
func TestManager_Restore(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testManagerRestore)
+}
+
+func testManagerRestore(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
testcfg.BuildGitalyHooks(t, cfg)
diff --git a/internal/gitaly/service/repository/restore_custom_hooks.go b/internal/gitaly/service/repository/restore_custom_hooks.go
index 0b6971717..bb6865049 100644
--- a/internal/gitaly/service/repository/restore_custom_hooks.go
+++ b/internal/gitaly/service/repository/restore_custom_hooks.go
@@ -1,9 +1,21 @@
package repository
import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
"os/exec"
+ "path/filepath"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
"google.golang.org/grpc/codes"
@@ -11,6 +23,10 @@ import (
)
func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error {
+ if featureflag.TransactionalRestoreCustomHooks.IsEnabled(stream.Context()) {
+ return s.restoreCustomHooksWithVoting(stream)
+ }
+
firstRequest, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Internal, "RestoreCustomHooks: first request failed %v", err)
@@ -57,3 +73,123 @@ func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCus
return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{})
}
+
+func (s *server) restoreCustomHooksWithVoting(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error {
+ firstRequest, err := stream.Recv()
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: first request failed %w", err)
+ }
+
+ ctx := stream.Context()
+
+ repo := firstRequest.GetRepository()
+ if repo == nil {
+ return helper.ErrInvalidArgumentf("RestoreCustomHooks: empty Repository")
+ }
+
+ v := voting.NewVoteHash()
+
+ repoPath, err := s.locator.GetRepoPath(repo)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: getting repo path failed %w", err)
+ }
+
+ customHooksPath := filepath.Join(repoPath, customHooksDir)
+
+ if err = os.MkdirAll(customHooksPath, os.ModePerm); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: making custom hooks directory %w", err)
+ }
+
+ lockDir, err := safe.NewLockingDirectory(customHooksPath)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: creating locking directory: %w", err)
+ }
+
+ if err := lockDir.Lock(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: locking directory failed: %w", err)
+ }
+
+ defer func() {
+ if !lockDir.IsLocked() {
+ return
+ }
+
+ if err := lockDir.Unlock(); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Warn("could not unlock directory")
+ }
+ }()
+
+ if err := voteCustomHooks(ctx, s.txManager, &v, voting.Prepared); err != nil {
+ return err
+ }
+
+ reader := streamio.NewReader(func() ([]byte, error) {
+ var data []byte
+ defer func() {
+ _, _ = v.Write(data)
+ }()
+
+ if firstRequest != nil {
+ data = firstRequest.GetData()
+ firstRequest = nil
+ return data, nil
+ }
+
+ request, err := stream.Recv()
+
+ data = request.GetData()
+ return data, err
+ })
+
+ cmdArgs := []string{
+ "-xf",
+ "-",
+ "-C",
+ repoPath,
+ customHooksDir,
+ }
+
+ cmd, err := command.New(ctx, exec.Command("tar", cmdArgs...), reader, nil, nil)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: Could not untar custom hooks tar %w", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: cmd wait failed: %w", err)
+ }
+
+ if err := voteCustomHooks(ctx, s.txManager, &v, voting.Committed); err != nil {
+ return err
+ }
+
+ if err := lockDir.Unlock(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: committing lock dir %w", err)
+ }
+
+ return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{})
+}
+
+func voteCustomHooks(
+ ctx context.Context,
+ txManager transaction.Manager,
+ v *voting.VoteHash,
+ phase voting.Phase,
+) error {
+ tx, err := txinfo.TransactionFromContext(ctx)
+ if errors.Is(err, txinfo.ErrTransactionNotFound) {
+ return nil
+ } else if err != nil {
+ return err
+ }
+
+ vote, err := v.Vote()
+ if err != nil {
+ return err
+ }
+
+ if err := txManager.Vote(ctx, tx, vote, phase); err != nil {
+ return fmt.Errorf("vote failed: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/gitaly/service/repository/restore_custom_hooks_test.go b/internal/gitaly/service/repository/restore_custom_hooks_test.go
index 294b51c45..8c02e0b4c 100644
--- a/internal/gitaly/service/repository/restore_custom_hooks_test.go
+++ b/internal/gitaly/service/repository/restore_custom_hooks_test.go
@@ -1,29 +1,77 @@
package repository
import (
+ "context"
"io"
"os"
"path/filepath"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
-func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) {
+func TestSuccessfulRestoreCustomHooksRequest(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testSuccessfulRestoreCustomHooksRequest)
+}
+
+func testSuccessfulRestoreCustomHooksRequest(t *testing.T, ctx context.Context) {
+ t.Parallel()
+
+ cfg := testcfg.Build(t)
+ testcfg.BuildGitalyHooks(t, cfg)
+ txManager := transaction.NewTrackingManager()
+
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ deps.GetConnsPool(),
+ deps.GetGit2goExecutor(),
+ deps.GetHousekeepingManager(),
+ ))
+ }, testserver.WithTransactionManager(txManager))
+ cfg.SocketPath = addr
+
+ client := newRepositoryClient(t, cfg, addr)
+
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
- ctx := testhelper.Context(t)
- _, repo, repoPath, client := setupRepositoryService(ctx, t)
+ ctx = metadata.IncomingToOutgoing(ctx)
+ repo, repoPath := gittest.CreateRepository(ctx, t, cfg)
+ // reset the txManager since CreateRepository would have done
+ // voting
+ txManager.Reset()
stream, err := client.RestoreCustomHooks(ctx)
require.NoError(t, err)
request := &gitalypb.RestoreCustomHooksRequest{Repository: repo}
+ voteHash := voting.NewVoteHash()
+
writer := streamio.NewWriter(func(p []byte) error {
+ voteHash.Write(p)
request.Data = p
if err := stream.Send(request); err != nil {
return err
@@ -35,20 +83,38 @@ func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) {
file, err := os.Open("testdata/custom_hooks.tar")
require.NoError(t, err)
- defer file.Close()
_, err = io.Copy(writer, file)
require.NoError(t, err)
_, err = stream.CloseAndRecv()
require.NoError(t, err)
+ testhelper.MustClose(t, file)
+
+ expectedVote, err := voteHash.Vote()
+ require.NoError(t, err)
+
require.FileExists(t, filepath.Join(repoPath, "custom_hooks", "pre-push.sample"))
+
+ if featureflag.TransactionalRestoreCustomHooks.IsEnabled(ctx) {
+ require.Equal(t, 2, len(txManager.Votes()))
+ assert.Equal(t, voting.Prepared, txManager.Votes()[0].Phase)
+ assert.Equal(t, expectedVote, txManager.Votes()[1].Vote)
+ assert.Equal(t, voting.Committed, txManager.Votes()[1].Phase)
+ } else {
+ require.Equal(t, 0, len(txManager.Votes()))
+ }
}
func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testFailedRestoreCustomHooksDueToValidations)
+}
+
+func testFailedRestoreCustomHooksDueToValidations(t *testing.T, ctx context.Context) {
+ t.Parallel()
_, client := setupRepositoryServiceWithoutRepo(t)
- ctx := testhelper.Context(t)
stream, err := client.RestoreCustomHooks(ctx)
require.NoError(t, err)
@@ -61,8 +127,11 @@ func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) {
func TestFailedRestoreCustomHooksDueToBadTar(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testFailedRestoreCustomHooksDueToBadTar)
+}
- ctx := testhelper.Context(t)
+func testFailedRestoreCustomHooksDueToBadTar(t *testing.T, ctx context.Context) {
_, repo, _, client := setupRepositoryService(ctx, t)
stream, err := client.RestoreCustomHooks(ctx)
diff --git a/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go
new file mode 100644
index 000000000..8f3e2337d
--- /dev/null
+++ b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go
@@ -0,0 +1,5 @@
+package featureflag
+
+// TransactionalRestoreCustomHooks will use transactional voting in the
+// RestoreCustomHooks RPC
+var TransactionalRestoreCustomHooks = NewFeatureFlag("tx_restore_custom_hooks", false)