Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2022-04-25 13:09:55 +0300
committerToon Claes <toon@gitlab.com>2022-04-25 13:09:55 +0300
commit953a7b80beb340565b30dce713747120552c23e5 (patch)
tree3ca5c2f119cd088b884cc627a573d265e5f256a9
parentf57a981311cf48840ffdc14dce1743e7e3cf0768 (diff)
parente66d0d5107a5e3a56259e3bdfc2617d24a920a94 (diff)
Merge branch 'jc-restore-custom-hooks-vote' into 'master'
repository: RestoreCustomHooks to do transaction voting Closes #4081 See merge request gitlab-org/gitaly!4481
-rw-r--r--internal/backup/backup_test.go8
-rw-r--r--internal/gitaly/service/repository/restore_custom_hooks.go136
-rw-r--r--internal/gitaly/service/repository/restore_custom_hooks_test.go81
-rw-r--r--internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go5
-rw-r--r--internal/safe/locking_directory.go95
-rw-r--r--internal/safe/locking_directory_test.go78
6 files changed, 396 insertions, 7 deletions
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index cd577b6a3..d47221f6a 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -1,6 +1,7 @@
package backup
import (
+ "context"
"fmt"
"os"
"path/filepath"
@@ -12,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
@@ -264,7 +266,11 @@ func TestManager_Create_incremental(t *testing.T) {
func TestManager_Restore(t *testing.T) {
t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testManagerRestore)
+}
+
+func testManagerRestore(t *testing.T, ctx context.Context) {
cfg := testcfg.Build(t)
testcfg.BuildGitalyHooks(t, cfg)
diff --git a/internal/gitaly/service/repository/restore_custom_hooks.go b/internal/gitaly/service/repository/restore_custom_hooks.go
index 0b6971717..bb6865049 100644
--- a/internal/gitaly/service/repository/restore_custom_hooks.go
+++ b/internal/gitaly/service/repository/restore_custom_hooks.go
@@ -1,9 +1,21 @@
package repository
import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
"os/exec"
+ "path/filepath"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/command"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
"google.golang.org/grpc/codes"
@@ -11,6 +23,10 @@ import (
)
func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error {
+ if featureflag.TransactionalRestoreCustomHooks.IsEnabled(stream.Context()) {
+ return s.restoreCustomHooksWithVoting(stream)
+ }
+
firstRequest, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Internal, "RestoreCustomHooks: first request failed %v", err)
@@ -57,3 +73,123 @@ func (s *server) RestoreCustomHooks(stream gitalypb.RepositoryService_RestoreCus
return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{})
}
+
+func (s *server) restoreCustomHooksWithVoting(stream gitalypb.RepositoryService_RestoreCustomHooksServer) error {
+ firstRequest, err := stream.Recv()
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: first request failed %w", err)
+ }
+
+ ctx := stream.Context()
+
+ repo := firstRequest.GetRepository()
+ if repo == nil {
+ return helper.ErrInvalidArgumentf("RestoreCustomHooks: empty Repository")
+ }
+
+ v := voting.NewVoteHash()
+
+ repoPath, err := s.locator.GetRepoPath(repo)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: getting repo path failed %w", err)
+ }
+
+ customHooksPath := filepath.Join(repoPath, customHooksDir)
+
+ if err = os.MkdirAll(customHooksPath, os.ModePerm); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: making custom hooks directory %w", err)
+ }
+
+ lockDir, err := safe.NewLockingDirectory(customHooksPath)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: creating locking directory: %w", err)
+ }
+
+ if err := lockDir.Lock(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: locking directory failed: %w", err)
+ }
+
+ defer func() {
+ if !lockDir.IsLocked() {
+ return
+ }
+
+ if err := lockDir.Unlock(); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).Warn("could not unlock directory")
+ }
+ }()
+
+ if err := voteCustomHooks(ctx, s.txManager, &v, voting.Prepared); err != nil {
+ return err
+ }
+
+ reader := streamio.NewReader(func() ([]byte, error) {
+ var data []byte
+ defer func() {
+ _, _ = v.Write(data)
+ }()
+
+ if firstRequest != nil {
+ data = firstRequest.GetData()
+ firstRequest = nil
+ return data, nil
+ }
+
+ request, err := stream.Recv()
+
+ data = request.GetData()
+ return data, err
+ })
+
+ cmdArgs := []string{
+ "-xf",
+ "-",
+ "-C",
+ repoPath,
+ customHooksDir,
+ }
+
+ cmd, err := command.New(ctx, exec.Command("tar", cmdArgs...), reader, nil, nil)
+ if err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: Could not untar custom hooks tar %w", err)
+ }
+
+ if err := cmd.Wait(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: cmd wait failed: %w", err)
+ }
+
+ if err := voteCustomHooks(ctx, s.txManager, &v, voting.Committed); err != nil {
+ return err
+ }
+
+ if err := lockDir.Unlock(); err != nil {
+ return helper.ErrInternalf("RestoreCustomHooks: committing lock dir %w", err)
+ }
+
+ return stream.SendAndClose(&gitalypb.RestoreCustomHooksResponse{})
+}
+
+func voteCustomHooks(
+ ctx context.Context,
+ txManager transaction.Manager,
+ v *voting.VoteHash,
+ phase voting.Phase,
+) error {
+ tx, err := txinfo.TransactionFromContext(ctx)
+ if errors.Is(err, txinfo.ErrTransactionNotFound) {
+ return nil
+ } else if err != nil {
+ return err
+ }
+
+ vote, err := v.Vote()
+ if err != nil {
+ return err
+ }
+
+ if err := txManager.Vote(ctx, tx, vote, phase); err != nil {
+ return fmt.Errorf("vote failed: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/gitaly/service/repository/restore_custom_hooks_test.go b/internal/gitaly/service/repository/restore_custom_hooks_test.go
index 294b51c45..8c02e0b4c 100644
--- a/internal/gitaly/service/repository/restore_custom_hooks_test.go
+++ b/internal/gitaly/service/repository/restore_custom_hooks_test.go
@@ -1,29 +1,77 @@
package repository
import (
+ "context"
"io"
"os"
"path/filepath"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
-func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) {
+func TestSuccessfulRestoreCustomHooksRequest(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testSuccessfulRestoreCustomHooksRequest)
+}
+
+func testSuccessfulRestoreCustomHooksRequest(t *testing.T, ctx context.Context) {
+ t.Parallel()
+
+ cfg := testcfg.Build(t)
+ testcfg.BuildGitalyHooks(t, cfg)
+ txManager := transaction.NewTrackingManager()
+
+ addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ deps.GetConnsPool(),
+ deps.GetGit2goExecutor(),
+ deps.GetHousekeepingManager(),
+ ))
+ }, testserver.WithTransactionManager(txManager))
+ cfg.SocketPath = addr
+
+ client := newRepositoryClient(t, cfg, addr)
+
+ ctx, err := txinfo.InjectTransaction(ctx, 1, "node", true)
+ require.NoError(t, err)
- ctx := testhelper.Context(t)
- _, repo, repoPath, client := setupRepositoryService(ctx, t)
+ ctx = metadata.IncomingToOutgoing(ctx)
+ repo, repoPath := gittest.CreateRepository(ctx, t, cfg)
+ // reset the txManager since CreateRepository would have done
+ // voting
+ txManager.Reset()
stream, err := client.RestoreCustomHooks(ctx)
require.NoError(t, err)
request := &gitalypb.RestoreCustomHooksRequest{Repository: repo}
+ voteHash := voting.NewVoteHash()
+
writer := streamio.NewWriter(func(p []byte) error {
+ voteHash.Write(p)
request.Data = p
if err := stream.Send(request); err != nil {
return err
@@ -35,20 +83,38 @@ func TestSuccessfullRestoreCustomHooksRequest(t *testing.T) {
file, err := os.Open("testdata/custom_hooks.tar")
require.NoError(t, err)
- defer file.Close()
_, err = io.Copy(writer, file)
require.NoError(t, err)
_, err = stream.CloseAndRecv()
require.NoError(t, err)
+ testhelper.MustClose(t, file)
+
+ expectedVote, err := voteHash.Vote()
+ require.NoError(t, err)
+
require.FileExists(t, filepath.Join(repoPath, "custom_hooks", "pre-push.sample"))
+
+ if featureflag.TransactionalRestoreCustomHooks.IsEnabled(ctx) {
+ require.Equal(t, 2, len(txManager.Votes()))
+ assert.Equal(t, voting.Prepared, txManager.Votes()[0].Phase)
+ assert.Equal(t, expectedVote, txManager.Votes()[1].Vote)
+ assert.Equal(t, voting.Committed, txManager.Votes()[1].Phase)
+ } else {
+ require.Equal(t, 0, len(txManager.Votes()))
+ }
}
func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testFailedRestoreCustomHooksDueToValidations)
+}
+
+func testFailedRestoreCustomHooksDueToValidations(t *testing.T, ctx context.Context) {
+ t.Parallel()
_, client := setupRepositoryServiceWithoutRepo(t)
- ctx := testhelper.Context(t)
stream, err := client.RestoreCustomHooks(ctx)
require.NoError(t, err)
@@ -61,8 +127,11 @@ func TestFailedRestoreCustomHooksDueToValidations(t *testing.T) {
func TestFailedRestoreCustomHooksDueToBadTar(t *testing.T) {
t.Parallel()
+ testhelper.NewFeatureSets(featureflag.TransactionalRestoreCustomHooks).
+ Run(t, testFailedRestoreCustomHooksDueToBadTar)
+}
- ctx := testhelper.Context(t)
+func testFailedRestoreCustomHooksDueToBadTar(t *testing.T, ctx context.Context) {
_, repo, _, client := setupRepositoryService(ctx, t)
stream, err := client.RestoreCustomHooks(ctx)
diff --git a/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go
new file mode 100644
index 000000000..8f3e2337d
--- /dev/null
+++ b/internal/metadata/featureflag/ff_transactional_restore_custom_hooks.go
@@ -0,0 +1,5 @@
+package featureflag
+
+// TransactionalRestoreCustomHooks will use transactional voting in the
+// RestoreCustomHooks RPC
+var TransactionalRestoreCustomHooks = NewFeatureFlag("tx_restore_custom_hooks", false)
diff --git a/internal/safe/locking_directory.go b/internal/safe/locking_directory.go
new file mode 100644
index 000000000..b15483bce
--- /dev/null
+++ b/internal/safe/locking_directory.go
@@ -0,0 +1,95 @@
+package safe
+
+import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+)
+
+type lockingDirectoryState int
+
+const (
+ lockingDirectoryStateUnlocked lockingDirectoryState = iota
+ lockingDirectoryStateLocked
+)
+
+// LockingDirectory allows locking and unlocking a directory for safe access and
+// modification.
+type LockingDirectory struct {
+ state lockingDirectoryState
+ path string
+}
+
+// NewLockingDirectory creates a new LockingDirectory.
+func NewLockingDirectory(path string) (*LockingDirectory, error) {
+ fi, err := os.Stat(path)
+ if err != nil {
+ return nil, fmt.Errorf("creating new locking directory: %w", err)
+ }
+
+ if !fi.IsDir() {
+ return nil, errors.New("not a directory")
+ }
+
+ ld := &LockingDirectory{
+ state: lockingDirectoryStateUnlocked,
+ path: path,
+ }
+
+ return ld, nil
+}
+
+// Lock locks the directory and prevents a second process with a
+// LockingDirectory from also locking the directory.
+func (ld *LockingDirectory) Lock() error {
+ if ld.state != lockingDirectoryStateUnlocked {
+ return errors.New("locking directory not lockable")
+ }
+
+ lock, err := os.OpenFile(ld.lockPath(), os.O_CREATE|os.O_EXCL|os.O_RDONLY, 0o400)
+ if err != nil {
+ if os.IsExist(err) {
+ return ErrFileAlreadyLocked
+ }
+
+ return fmt.Errorf("creating lock file: %w", err)
+ }
+ _ = lock.Close()
+
+ ld.state = lockingDirectoryStateLocked
+
+ return nil
+}
+
+// IsLocked returns whether or not the directory has been locked.
+func (ld *LockingDirectory) IsLocked() bool {
+ return ld.state == lockingDirectoryStateLocked
+}
+
+// Unlock unlocks the directory.
+func (ld *LockingDirectory) Unlock() error {
+ if ld.state != lockingDirectoryStateLocked {
+ return errors.New("locking directory not locked")
+ }
+
+ if err := os.Remove(ld.lockPath()); err != nil {
+ // A previous call might have returned an error
+ // but still removed the file.
+ if errors.Is(err, fs.ErrNotExist) {
+ ld.state = lockingDirectoryStateUnlocked
+ return nil
+ }
+
+ return fmt.Errorf("unlocking directory: %w", err)
+ }
+
+ ld.state = lockingDirectoryStateUnlocked
+
+ return nil
+}
+
+func (ld *LockingDirectory) lockPath() string {
+ return filepath.Join(ld.path, ".lock")
+}
diff --git a/internal/safe/locking_directory_test.go b/internal/safe/locking_directory_test.go
new file mode 100644
index 000000000..e9280c3dd
--- /dev/null
+++ b/internal/safe/locking_directory_test.go
@@ -0,0 +1,78 @@
+package safe_test
+
+import (
+ "errors"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestLockingDirectory(t *testing.T) {
+ t.Parallel()
+
+ t.Run("normal lifecycle", func(t *testing.T) {
+ path := testhelper.TempDir(t)
+ lockingDir, err := safe.NewLockingDirectory(path)
+ require.NoError(t, err)
+ require.NoError(t, lockingDir.Lock())
+ secondLockingDir, err := safe.NewLockingDirectory(path)
+ require.NoError(t, err)
+ require.NoError(t, os.WriteFile(
+ filepath.Join(path, "somefile"),
+ []byte("data"),
+ 0o644),
+ )
+ assert.ErrorIs(t, secondLockingDir.Lock(), safe.ErrFileAlreadyLocked)
+ require.NoError(t, lockingDir.Unlock())
+ })
+
+ t.Run("multiple locks fail", func(t *testing.T) {
+ path := testhelper.TempDir(t)
+ lockingDir, err := safe.NewLockingDirectory(path)
+ require.NoError(t, err)
+ require.NoError(t, lockingDir.Lock())
+ assert.Equal(
+ t,
+ errors.New("locking directory not lockable"),
+ lockingDir.Lock(),
+ )
+ })
+
+ t.Run("unlock without lock fails", func(t *testing.T) {
+ path := testhelper.TempDir(t)
+ lockingDir, err := safe.NewLockingDirectory(path)
+ require.NoError(t, err)
+ assert.Equal(
+ t,
+ errors.New("locking directory not locked"),
+ lockingDir.Unlock(),
+ )
+ })
+
+ t.Run("multiple unlocks fail", func(t *testing.T) {
+ path := testhelper.TempDir(t)
+ lockingDir, err := safe.NewLockingDirectory(path)
+ require.NoError(t, err)
+ require.NoError(t, lockingDir.Lock())
+ require.NoError(t, lockingDir.Unlock())
+ assert.Equal(
+ t,
+ errors.New("locking directory not locked"),
+ lockingDir.Unlock(),
+ )
+ })
+
+ t.Run("fails if directory is missing", func(t *testing.T) {
+ path := testhelper.TempDir(t)
+ require.NoError(t, os.RemoveAll(path))
+
+ _, err := safe.NewLockingDirectory(path)
+ assert.True(t, errors.Is(err, fs.ErrNotExist))
+ })
+}