Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-10 17:59:06 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-12-13 09:47:19 +0300
commitcfd0fd048b46f4d783880f0774ffbdb7abcc8cb4 (patch)
treee06302d78fc93f3065e50fbc2e7c4a1f34004089
parent0c08541cf496d76c5611ee1e7bcfc49724dbcc21 (diff)
transaction: Wire up transactional voting phases
Convert all callsites which perform transactional voting to specify the phase the vote is being cast in. This information is not yet used by Praefect yet, but serves as the baseline to make more informed replication decisions in Praefect's at some point. Ideally, all sites would either use "prepared" or "committed" as their phase. But we have some callsites which don't use proper two-phase voting, which thus cast their votes with "unknown" phase. These will need to get converted eventually.
-rw-r--r--internal/gitaly/hook/referencetransaction.go21
-rw-r--r--internal/gitaly/hook/transactions.go9
-rw-r--r--internal/gitaly/hook/transactions_test.go2
-rw-r--r--internal/gitaly/service/ref/delete_refs.go11
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes.go10
-rw-r--r--internal/gitaly/service/repository/create_repository.go2
-rw-r--r--internal/gitaly/service/repository/create_repository_from_bundle_test.go40
-rw-r--r--internal/gitaly/service/repository/create_repository_test.go8
-rw-r--r--internal/gitaly/service/repository/fetch_remote.go2
-rw-r--r--internal/gitaly/service/repository/remove.go27
-rw-r--r--internal/gitaly/service/repository/util.go4
-rw-r--r--internal/gitaly/service/repository/util_test.go23
-rw-r--r--internal/gitaly/service/ssh/receive_pack.go2
-rw-r--r--internal/gitaly/transaction/manager.go10
-rw-r--r--internal/gitaly/transaction/manager_test.go49
-rw-r--r--internal/gitaly/transaction/mock.go22
-rw-r--r--internal/gitaly/transaction/voting.go8
-rw-r--r--internal/gitaly/transaction/voting_test.go33
-rw-r--r--internal/transaction/voting/phase.go36
19 files changed, 214 insertions, 105 deletions
diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go
index f391876e0..bba8eaabe 100644
--- a/internal/gitaly/hook/referencetransaction.go
+++ b/internal/gitaly/hook/referencetransaction.go
@@ -9,6 +9,7 @@ import (
"io"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
)
// forceDeletionPrefix is the prefix of a queued reference transaction which deletes a
@@ -27,13 +28,19 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state
return fmt.Errorf("reading stdin from request: %w", err)
}
+ var phase voting.Phase
+ switch state {
// We're voting in prepared state as this is the only stage in Git's reference transaction
- // which allows us to abort the transaction. We're also voting in committed state to tell
- // Praefect we've actually persisted the changes. This is necessary as some RPCs fail return
- // errors in the response body rather than as an error code. Praefect can't tell if these RPCs
- // have failed. Voting on committed ensure Praefect sees either a missing vote or that the RPC did
- // commit the changes.
- if state != ReferenceTransactionPrepared && state != ReferenceTransactionCommitted {
+ // which allows us to abort the transaction.
+ case ReferenceTransactionPrepared:
+ phase = voting.Prepared
+ // We're also voting in committed state to tell Praefect we've actually persisted the
+ // changes. This is necessary as some RPCs fail return errors in the response body rather
+ // than as an error code. Praefect can't tell if these RPCs have failed. Voting on committed
+ // ensure Praefect sees either a missing vote or that the RPC did commit the changes.
+ case ReferenceTransactionCommitted:
+ phase = voting.Committed
+ default:
return nil
}
@@ -61,7 +68,7 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state
hash := sha1.Sum(changes)
- if err := m.voteOnTransaction(ctx, hash, payload); err != nil {
+ if err := m.voteOnTransaction(ctx, hash, phase, payload); err != nil {
return fmt.Errorf("error voting on transaction: %w", err)
}
diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go
index 78cd82c4c..33097b551 100644
--- a/internal/gitaly/hook/transactions.go
+++ b/internal/gitaly/hook/transactions.go
@@ -32,9 +32,14 @@ func (m *GitLabHookManager) runWithTransaction(ctx context.Context, payload git.
return nil
}
-func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, vote voting.Vote, payload git.HooksPayload) error {
+func (m *GitLabHookManager) voteOnTransaction(
+ ctx context.Context,
+ vote voting.Vote,
+ phase voting.Phase,
+ payload git.HooksPayload,
+) error {
return m.runWithTransaction(ctx, payload, func(ctx context.Context, tx txinfo.Transaction) error {
- return m.txManager.Vote(ctx, tx, vote)
+ return m.txManager.Vote(ctx, tx, vote, phase)
})
}
diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go
index 0f1759b95..de9f7d7cb 100644
--- a/internal/gitaly/hook/transactions_test.go
+++ b/internal/gitaly/hook/transactions_test.go
@@ -124,7 +124,7 @@ func TestHookManager_contextCancellationCancelsVote(t *testing.T) {
cfg, repo, _ := testcfg.BuildWithRepo(t)
mockTxMgr := transaction.MockManager{
- VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+ VoteFn: func(ctx context.Context, _ txinfo.Transaction, _ voting.Vote, _ voting.Phase) error {
<-ctx.Done()
return fmt.Errorf("mock error: %s", ctx.Err())
},
diff --git a/internal/gitaly/service/ref/delete_refs.go b/internal/gitaly/service/ref/delete_refs.go
index 26e6d2929..222432ed0 100644
--- a/internal/gitaly/service/ref/delete_refs.go
+++ b/internal/gitaly/service/ref/delete_refs.go
@@ -60,11 +60,18 @@ func (s *server) DeleteRefs(ctx context.Context, in *gitalypb.DeleteRefsRequest)
return nil, helper.ErrInternalf("could not compute vote: %v", err)
}
+ // We don't use proper two-phase voting when the feature flag is disabled, so we use
+ // `UnknownPhase` in that case.
+ preparedPhase := voting.UnknownPhase
+ if featureflag.TxTwoPhaseDeleteRefs.IsEnabled(ctx) {
+ preparedPhase = voting.Prepared
+ }
+
// All deletes we're doing in this RPC are force deletions. Because we're required to filter
// out transactions which only consist of force deletions, we never do any voting via the
// reference-transaction hook here. Instead, we need to resort to a manual vote which is
// simply the concatenation of all reference we're about to delete.
- if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote, preparedPhase); err != nil {
return nil, helper.ErrInternalf("preparatory vote: %w", err)
}
@@ -77,7 +84,7 @@ func (s *server) DeleteRefs(ctx context.Context, in *gitalypb.DeleteRefsRequest)
}
if featureflag.TxTwoPhaseDeleteRefs.IsEnabled(ctx) {
- if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote, voting.Committed); err != nil {
return nil, helper.ErrInternalf("committing vote: %w", err)
}
}
diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go
index 82a08d1a1..8f188183a 100644
--- a/internal/gitaly/service/repository/apply_gitattributes.go
+++ b/internal/gitaly/service/repository/apply_gitattributes.go
@@ -66,7 +66,7 @@ func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, o
// We use the zero OID as placeholder to vote on removal of the
// gitattributes file.
- if err := s.vote(ctx, git.ZeroOID); err != nil {
+ if err := s.vote(ctx, git.ZeroOID, voting.Prepared); err != nil {
return fmt.Errorf("preimage vote: %w", err)
}
@@ -74,7 +74,7 @@ func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, o
return err
}
- if err := s.vote(ctx, git.ZeroOID); err != nil {
+ if err := s.vote(ctx, git.ZeroOID, voting.Committed); err != nil {
return fmt.Errorf("postimage vote: %w", err)
}
@@ -83,7 +83,7 @@ func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, o
// If there is no gitattributes file, we simply use the ZeroOID
// as a placeholder to vote on the removal.
- if err := s.vote(ctx, git.ZeroOID); err != nil {
+ if err := s.vote(ctx, git.ZeroOID, voting.UnknownPhase); err != nil {
return fmt.Errorf("could not remove gitattributes: %w", err)
}
@@ -114,7 +114,7 @@ func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, o
return nil
}
-func (s *server) vote(ctx context.Context, oid git.ObjectID) error {
+func (s *server) vote(ctx context.Context, oid git.ObjectID, phase voting.Phase) error {
tx, err := txinfo.TransactionFromContext(ctx)
if errors.Is(err, txinfo.ErrTransactionNotFound) {
return nil
@@ -130,7 +130,7 @@ func (s *server) vote(ctx context.Context, oid git.ObjectID) error {
return fmt.Errorf("cannot convert OID to vote: %w", err)
}
- if err := s.txManager.Vote(ctx, tx, vote); err != nil {
+ if err := s.txManager.Vote(ctx, tx, vote, phase); err != nil {
return fmt.Errorf("vote failed: %w", err)
}
diff --git a/internal/gitaly/service/repository/create_repository.go b/internal/gitaly/service/repository/create_repository.go
index 334d8b79b..7d3243eb9 100644
--- a/internal/gitaly/service/repository/create_repository.go
+++ b/internal/gitaly/service/repository/create_repository.go
@@ -84,7 +84,7 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos
return err
}
- if err := s.txManager.Vote(ctx, tx, vote); err != nil {
+ if err := s.txManager.Vote(ctx, tx, vote, voting.UnknownPhase); err != nil {
return fmt.Errorf("casting vote: %w", err)
}
diff --git a/internal/gitaly/service/repository/create_repository_from_bundle_test.go b/internal/gitaly/service/repository/create_repository_from_bundle_test.go
index 0787122c4..668b135f5 100644
--- a/internal/gitaly/service/repository/create_repository_from_bundle_test.go
+++ b/internal/gitaly/service/repository/create_repository_from_bundle_test.go
@@ -148,51 +148,43 @@ func testCreateRepositoryFromBundleTransactional(t *testing.T, ctx context.Conte
require.NoError(t, err)
if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
- createVote := func(hash string) voting.Vote {
+ createVote := func(hash string, phase voting.Phase) transaction.PhasedVote {
vote, err := voting.VoteFromString(hash)
require.NoError(t, err)
- return vote
+ return transaction.PhasedVote{Vote: vote, Phase: phase}
}
// While the following votes are opaque to us, this doesn't really matter. All we do
// care about is that they're stable.
- require.Equal(t, []voting.Vote{
+ require.Equal(t, []transaction.PhasedVote{
// These are the votes created by git-fetch(1).
- createVote("47553c06f575f757ad56ef3216c59804b72aa4a6"),
- createVote("47553c06f575f757ad56ef3216c59804b72aa4a6"),
+ createVote("47553c06f575f757ad56ef3216c59804b72aa4a6", voting.Prepared),
+ createVote("47553c06f575f757ad56ef3216c59804b72aa4a6", voting.Committed),
// And this is the manual votes we compute by walking the repository.
- createVote("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
- createVote("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
+ createVote("da39a3ee5e6b4b0d3255bfef95601890afd80709", voting.Prepared),
+ createVote("da39a3ee5e6b4b0d3255bfef95601890afd80709", voting.Committed),
}, txManager.Votes())
return
}
- var votingInput []string
-
// This accounts for the first two votes via git-clone(1). Given that git-clone(1) creates
// all references in a single reference transaction, the hook is executed twice for all
// fetched references (once for "prepare", once for "commit").
- votingInput = append(votingInput,
- fmt.Sprintf("%s %s refs/heads/feature\n%s %s refs/heads/master\n", git.ZeroOID, featureOID, git.ZeroOID, masterOID),
- fmt.Sprintf("%s %s refs/heads/feature\n%s %s refs/heads/master\n", git.ZeroOID, featureOID, git.ZeroOID, masterOID),
- )
+ voteFromCloning := []byte(fmt.Sprintf("%s %s refs/heads/feature\n%s %s refs/heads/master\n", git.ZeroOID, featureOID, git.ZeroOID, masterOID))
// Keep-around references are not fetched via git-clone(1) because non-mirror clones only
// fetch branches and tags. These additional references are thus obtained via git-fetch(1).
// Given that we use the `--atomic` flag for git-fetch(1), all reference updates will be in
// a single transaction and we thus expect exactly two votes (once for "prepare", once for
// "commit").
- votingInput = append(votingInput,
- fmt.Sprintf("%s %s refs/keep-around/2\n%s %s refs/keep-around/1\n", git.ZeroOID, masterOID, git.ZeroOID, masterOID),
- fmt.Sprintf("%s %s refs/keep-around/2\n%s %s refs/keep-around/1\n", git.ZeroOID, masterOID, git.ZeroOID, masterOID),
- )
-
- var expectedVotes []voting.Vote
- for _, expectedVote := range votingInput {
- expectedVotes = append(expectedVotes, voting.VoteFromData([]byte(expectedVote)))
- }
-
- require.Equal(t, expectedVotes, txManager.Votes())
+ voteFromFetching := []byte(fmt.Sprintf("%s %s refs/keep-around/2\n%s %s refs/keep-around/1\n", git.ZeroOID, masterOID, git.ZeroOID, masterOID))
+
+ require.Equal(t, []transaction.PhasedVote{
+ {Vote: voting.VoteFromData(voteFromCloning), Phase: voting.Prepared},
+ {Vote: voting.VoteFromData(voteFromCloning), Phase: voting.Committed},
+ {Vote: voting.VoteFromData(voteFromFetching), Phase: voting.Prepared},
+ {Vote: voting.VoteFromData(voteFromFetching), Phase: voting.Committed},
+ }, txManager.Votes())
}
func TestCreateRepositoryFromBundle_invalidBundle(t *testing.T) {
diff --git a/internal/gitaly/service/repository/create_repository_test.go b/internal/gitaly/service/repository/create_repository_test.go
index 5fca7246c..6e2a6df7c 100644
--- a/internal/gitaly/service/repository/create_repository_test.go
+++ b/internal/gitaly/service/repository/create_repository_test.go
@@ -172,8 +172,8 @@ func testCreateRepositoryTransactional(t *testing.T, ctx context.Context) {
if featureflag.TxAtomicRepositoryCreation.IsEnabled(ctx) {
require.Equal(t, 2, len(txManager.Votes()), "expected transactional vote")
} else {
- require.Equal(t, []voting.Vote{
- voting.VoteFromData([]byte{}),
+ require.Equal(t, []transaction.PhasedVote{
+ {Vote: voting.VoteFromData([]byte{}), Phase: voting.UnknownPhase},
}, txManager.Votes())
}
})
@@ -202,7 +202,9 @@ func testCreateRepositoryTransactional(t *testing.T, ctx context.Context) {
refs := gittest.Exec(t, cfg, "-C", repoPath, "for-each-ref")
require.NotEmpty(t, refs)
- require.Equal(t, []voting.Vote{voting.VoteFromData(refs)}, txManager.Votes())
+ require.Equal(t, []transaction.PhasedVote{
+ {Vote: voting.VoteFromData(refs), Phase: voting.UnknownPhase},
+ }, txManager.Votes())
})
}
diff --git a/internal/gitaly/service/repository/fetch_remote.go b/internal/gitaly/service/repository/fetch_remote.go
index 81845a363..f7849d838 100644
--- a/internal/gitaly/service/repository/fetch_remote.go
+++ b/internal/gitaly/service/repository/fetch_remote.go
@@ -114,7 +114,7 @@ func (s *server) FetchRemote(ctx context.Context, req *gitalypb.FetchRemoteReque
return err
}
- return s.txManager.Vote(ctx, tx, vote)
+ return s.txManager.Vote(ctx, tx, vote, voting.UnknownPhase)
}); err != nil {
return nil, status.Errorf(codes.Aborted, "failed vote on refs: %v", err)
}
diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go
index c7befa398..e8a00efd7 100644
--- a/internal/gitaly/service/repository/remove.go
+++ b/internal/gitaly/service/repository/remove.go
@@ -84,7 +84,7 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi
return nil, helper.ErrInternalf("re-statting repository: %w", err)
}
- if err := s.voteOnAction(ctx, repo, preRemove); err != nil {
+ if err := s.voteOnAction(ctx, repo, voting.Prepared); err != nil {
return nil, helper.ErrInternalf("vote on rename: %v", err)
}
@@ -99,11 +99,11 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi
return nil, helper.ErrInternalf("removing repository: %w", err)
}
- if err := s.voteOnAction(ctx, repo, postRemove); err != nil {
+ if err := s.voteOnAction(ctx, repo, voting.Committed); err != nil {
return nil, helper.ErrInternalf("vote on finalizing: %v", err)
}
} else {
- if err := s.voteOnAction(ctx, repo, preRemove); err != nil {
+ if err := s.voteOnAction(ctx, repo, voting.Prepared); err != nil {
return nil, helper.ErrInternalf("vote on rename: %v", err)
}
@@ -121,7 +121,7 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi
return nil, helper.ErrInternal(err)
}
- if err := s.voteOnAction(ctx, repo, postRemove); err != nil {
+ if err := s.voteOnAction(ctx, repo, voting.Committed); err != nil {
return nil, helper.ErrInternalf("vote on finalizing: %v", err)
}
}
@@ -129,26 +129,19 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi
return &gitalypb.RemoveRepositoryResponse{}, nil
}
-type removalStep int
-
-const (
- preRemove = removalStep(iota)
- postRemove
-)
-
-func (s *server) voteOnAction(ctx context.Context, repo *gitalypb.Repository, step removalStep) error {
+func (s *server) voteOnAction(ctx context.Context, repo *gitalypb.Repository, phase voting.Phase) error {
return transaction.RunOnContext(ctx, func(tx txinfo.Transaction) error {
var voteStep string
- switch step {
- case preRemove:
+ switch phase {
+ case voting.Prepared:
voteStep = "pre-remove"
- case postRemove:
+ case voting.Committed:
voteStep = "post-remove"
default:
- return fmt.Errorf("invalid removal step: %d", step)
+ return fmt.Errorf("invalid removal step: %d", phase)
}
vote := fmt.Sprintf("%s %s", voteStep, repo.GetRelativePath())
- return s.txManager.Vote(ctx, tx, voting.VoteFromData([]byte(vote)))
+ return s.txManager.Vote(ctx, tx, voting.VoteFromData([]byte(vote)), phase)
})
}
diff --git a/internal/gitaly/service/repository/util.go b/internal/gitaly/service/repository/util.go
index 50a1182b3..dda214eb1 100644
--- a/internal/gitaly/service/repository/util.go
+++ b/internal/gitaly/service/repository/util.go
@@ -177,7 +177,7 @@ func (s *server) createRepository(
return helper.ErrAlreadyExistsf("repository exists already")
}
- if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote, voting.Prepared); err != nil {
return helper.ErrFailedPreconditionf("preparatory vote: %w", err)
}
@@ -187,7 +187,7 @@ func (s *server) createRepository(
return fmt.Errorf("moving repository into place: %w", err)
}
- if err := transaction.VoteOnContext(ctx, s.txManager, vote); err != nil {
+ if err := transaction.VoteOnContext(ctx, s.txManager, vote, voting.Committed); err != nil {
return helper.ErrFailedPreconditionf("committing vote: %w", err)
}
diff --git a/internal/gitaly/service/repository/util_test.go b/internal/gitaly/service/repository/util_test.go
index 2e4602a63..48ccebd15 100644
--- a/internal/gitaly/service/repository/util_test.go
+++ b/internal/gitaly/service/repository/util_test.go
@@ -44,7 +44,7 @@ func TestCreateRepository(t *testing.T) {
gitCmdFactory: gitCmdFactory,
}
- votes := 0
+ var votesByPhase map[voting.Phase]int
for _, tc := range []struct {
desc string
@@ -136,21 +136,24 @@ func TestCreateRepository(t *testing.T) {
desc: "successful transaction",
transactional: true,
setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
- votes = 0
- txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
- votes++
+ votesByPhase = map[voting.Phase]int{}
+ txManager.VoteFn = func(_ context.Context, _ txinfo.Transaction, _ voting.Vote, phase voting.Phase) error {
+ votesByPhase[phase]++
return nil
}
},
verify: func(t *testing.T, tempRepo *gitalypb.Repository, tempRepoPath string, realRepo *gitalypb.Repository, realRepoPath string) {
- require.Equal(t, 2, votes)
+ require.Equal(t, map[voting.Phase]int{
+ voting.Prepared: 1,
+ voting.Committed: 1,
+ }, votesByPhase)
},
},
{
desc: "failing preparatory vote",
transactional: true,
setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
- txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
return errors.New("vote failed")
}
},
@@ -164,10 +167,8 @@ func TestCreateRepository(t *testing.T) {
desc: "failing post-commit vote",
transactional: true,
setup: func(t *testing.T, repo *gitalypb.Repository, repoPath string) {
- votes = 0
- txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
- votes++
- if votes == 1 {
+ txManager.VoteFn = func(_ context.Context, _ txinfo.Transaction, _ voting.Vote, phase voting.Phase) error {
+ if phase == voting.Prepared {
return nil
}
return errors.New("vote failed")
@@ -196,7 +197,7 @@ func TestCreateRepository(t *testing.T) {
require.NoError(t, err)
require.NoError(t, lock.Close())
- txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote) error {
+ txManager.VoteFn = func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
require.FailNow(t, "no votes should have happened")
return nil
}
diff --git a/internal/gitaly/service/ssh/receive_pack.go b/internal/gitaly/service/ssh/receive_pack.go
index 4e82eddc4..80c359e03 100644
--- a/internal/gitaly/service/ssh/receive_pack.go
+++ b/internal/gitaly/service/ssh/receive_pack.go
@@ -100,7 +100,7 @@ func (s *server) sshReceivePack(stream gitalypb.SSHService_SSHReceivePackServer,
// ensure there's always at least one vote. In case there was diverging behaviour in
// git-receive-pack(1) which led to a different outcome across voters, then this final vote
// would fail because the sequence of votes would be different.
- if err := transaction.VoteOnContext(ctx, s.txManager, voting.Vote{}); err != nil {
+ if err := transaction.VoteOnContext(ctx, s.txManager, voting.Vote{}, voting.Committed); err != nil {
return status.Errorf(codes.Aborted, "final transactional vote: %v", err)
}
diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go
index e53088115..d7e2a0993 100644
--- a/internal/gitaly/transaction/manager.go
+++ b/internal/gitaly/transaction/manager.go
@@ -47,7 +47,7 @@ var (
type Manager interface {
// Vote casts a vote on the given transaction which is hosted by the
// given Praefect server.
- Vote(context.Context, txinfo.Transaction, voting.Vote) error
+ Vote(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error
// Stop gracefully stops the given transaction which is hosted by the
// given Praefect server.
@@ -97,7 +97,12 @@ func (m *PoolManager) getTransactionClient(ctx context.Context, tx txinfo.Transa
}
// Vote connects to the given server and casts vote as a vote for the transaction identified by tx.
-func (m *PoolManager) Vote(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+func (m *PoolManager) Vote(
+ ctx context.Context,
+ tx txinfo.Transaction,
+ vote voting.Vote,
+ phase voting.Phase,
+) error {
client, err := m.getTransactionClient(ctx, tx)
if err != nil {
return err
@@ -118,6 +123,7 @@ func (m *PoolManager) Vote(ctx context.Context, tx txinfo.Transaction, vote voti
TransactionId: tx.ID,
Node: tx.Node,
ReferenceUpdatesHash: vote.Bytes(),
+ Phase: phase.ToProto(),
})
if err != nil {
// Add some additional context to cancellation errors so that
diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go
index 6239f0ab2..eb67aa769 100644
--- a/internal/gitaly/transaction/manager_test.go
+++ b/internal/gitaly/transaction/manager_test.go
@@ -63,21 +63,64 @@ func TestPoolManager_Vote(t *testing.T) {
desc string
transaction txinfo.Transaction
vote voting.Vote
+ phase voting.Phase
voteFn func(*testing.T, *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error)
expectedErr error
}{
{
- desc: "successful vote",
+ desc: "successful unknown vote",
transaction: txinfo.Transaction{
BackchannelID: backchannelID,
ID: 1,
Node: "node",
},
- vote: voting.VoteFromData([]byte("foobar")),
+ vote: voting.VoteFromData([]byte("foobar")),
+ phase: voting.UnknownPhase,
voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
require.Equal(t, uint64(1), request.TransactionId)
require.Equal(t, "node", request.Node)
require.Equal(t, request.ReferenceUpdatesHash, voting.VoteFromData([]byte("foobar")).Bytes())
+ require.Equal(t, gitalypb.VoteTransactionRequest_UNKNOWN_PHASE, request.Phase)
+
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_COMMIT,
+ }, nil
+ },
+ },
+ {
+ desc: "successful prepared vote",
+ transaction: txinfo.Transaction{
+ BackchannelID: backchannelID,
+ ID: 1,
+ Node: "node",
+ },
+ vote: voting.VoteFromData([]byte("foobar")),
+ phase: voting.Prepared,
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ require.Equal(t, "node", request.Node)
+ require.Equal(t, request.ReferenceUpdatesHash, voting.VoteFromData([]byte("foobar")).Bytes())
+ require.Equal(t, gitalypb.VoteTransactionRequest_PREPARED_PHASE, request.Phase)
+
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_COMMIT,
+ }, nil
+ },
+ },
+ {
+ desc: "successful committed vote",
+ transaction: txinfo.Transaction{
+ BackchannelID: backchannelID,
+ ID: 1,
+ Node: "node",
+ },
+ vote: voting.VoteFromData([]byte("foobar")),
+ phase: voting.Committed,
+ voteFn: func(t *testing.T, request *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
+ require.Equal(t, uint64(1), request.TransactionId)
+ require.Equal(t, "node", request.Node)
+ require.Equal(t, request.ReferenceUpdatesHash, voting.VoteFromData([]byte("foobar")).Bytes())
+ require.Equal(t, gitalypb.VoteTransactionRequest_COMMITTED_PHASE, request.Phase)
return &gitalypb.VoteTransactionResponse{
State: gitalypb.VoteTransactionResponse_COMMIT,
@@ -124,7 +167,7 @@ func TestPoolManager_Vote(t *testing.T) {
return tc.voteFn(t, request)
}
- err := manager.Vote(ctx, tc.transaction, tc.vote)
+ err := manager.Vote(ctx, tc.transaction, tc.vote, tc.phase)
testhelper.RequireGrpcError(t, tc.expectedErr, err)
})
}
diff --git a/internal/gitaly/transaction/mock.go b/internal/gitaly/transaction/mock.go
index 0d12b9f91..10ad6efdf 100644
--- a/internal/gitaly/transaction/mock.go
+++ b/internal/gitaly/transaction/mock.go
@@ -11,16 +11,16 @@ import (
// MockManager is a mock Manager for use in tests.
type MockManager struct {
- VoteFn func(context.Context, txinfo.Transaction, voting.Vote) error
+ VoteFn func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error
StopFn func(context.Context, txinfo.Transaction) error
}
// Vote calls the MockManager's Vote function, if set. Otherwise, it returns an error.
-func (m *MockManager) Vote(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+func (m *MockManager) Vote(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
if m.VoteFn == nil {
return errors.New("mock does not implement Vote function")
}
- return m.VoteFn(ctx, tx, vote)
+ return m.VoteFn(ctx, tx, vote, phase)
}
// Stop calls the MockManager's Stop function, if set. Otherwise, it returns an error.
@@ -31,23 +31,29 @@ func (m *MockManager) Stop(ctx context.Context, tx txinfo.Transaction) error {
return m.StopFn(ctx, tx)
}
+// PhasedVote is used to keep track of votes and the phase they were cast in.
+type PhasedVote struct {
+ Vote voting.Vote
+ Phase voting.Phase
+}
+
// TrackingManager is a transaction manager which tracks all votes. Voting functions never return
// an error.
type TrackingManager struct {
MockManager
votesLock sync.Mutex
- votes []voting.Vote
+ votes []PhasedVote
}
// NewTrackingManager creates a new TrackingManager which is ready for use.
func NewTrackingManager() *TrackingManager {
manager := &TrackingManager{}
- manager.VoteFn = func(_ context.Context, _ txinfo.Transaction, vote voting.Vote) error {
+ manager.VoteFn = func(_ context.Context, _ txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
manager.votesLock.Lock()
defer manager.votesLock.Unlock()
- manager.votes = append(manager.votes, vote)
+ manager.votes = append(manager.votes, PhasedVote{Vote: vote, Phase: phase})
return nil
}
@@ -55,11 +61,11 @@ func NewTrackingManager() *TrackingManager {
}
// Votes returns a copy of all votes which have been cast.
-func (m *TrackingManager) Votes() []voting.Vote {
+func (m *TrackingManager) Votes() []PhasedVote {
m.votesLock.Lock()
defer m.votesLock.Unlock()
- votes := make([]voting.Vote, len(m.votes))
+ votes := make([]PhasedVote, len(m.votes))
copy(votes, m.votes)
return votes
diff --git a/internal/gitaly/transaction/voting.go b/internal/gitaly/transaction/voting.go
index 071d944a8..d5ed0683f 100644
--- a/internal/gitaly/transaction/voting.go
+++ b/internal/gitaly/transaction/voting.go
@@ -25,9 +25,9 @@ func RunOnContext(ctx context.Context, fn func(txinfo.Transaction) error) error
}
// VoteOnContext casts the vote on a transaction identified by the context, if there is any.
-func VoteOnContext(ctx context.Context, m Manager, vote voting.Vote) error {
+func VoteOnContext(ctx context.Context, m Manager, vote voting.Vote, phase voting.Phase) error {
return RunOnContext(ctx, func(transaction txinfo.Transaction) error {
- return m.Vote(ctx, transaction, vote)
+ return m.Vote(ctx, transaction, vote, phase)
})
}
@@ -56,7 +56,7 @@ func CommitLockedFile(ctx context.Context, m Manager, writer *safe.LockingFileWr
return fmt.Errorf("computing vote for locked file: %w", err)
}
- if err := m.Vote(ctx, tx, vote); err != nil {
+ if err := m.Vote(ctx, tx, vote, voting.Prepared); err != nil {
return fmt.Errorf("preimage vote: %w", err)
}
@@ -69,7 +69,7 @@ func CommitLockedFile(ctx context.Context, m Manager, writer *safe.LockingFileWr
return fmt.Errorf("committing file: %w", err)
}
- if err := VoteOnContext(ctx, m, vote); err != nil {
+ if err := VoteOnContext(ctx, m, vote, voting.Committed); err != nil {
return fmt.Errorf("postimage vote: %w", err)
}
diff --git a/internal/gitaly/transaction/voting_test.go b/internal/gitaly/transaction/voting_test.go
index 434c9374e..dc195c9f4 100644
--- a/internal/gitaly/transaction/voting_test.go
+++ b/internal/gitaly/transaction/voting_test.go
@@ -76,10 +76,10 @@ func TestVoteOnContext(t *testing.T) {
AuthInfo: backchannel.WithID(nil, 1234),
}
- vote := voting.VoteFromData([]byte("1"))
+ expectedVote := voting.VoteFromData([]byte("1"))
t.Run("without transaction", func(t *testing.T) {
- require.NoError(t, VoteOnContext(ctx, &MockManager{}, voting.Vote{}))
+ require.NoError(t, VoteOnContext(ctx, &MockManager{}, voting.Vote{}, voting.Prepared))
})
t.Run("successful vote", func(t *testing.T) {
@@ -89,7 +89,7 @@ func TestVoteOnContext(t *testing.T) {
callbackExecuted := false
require.NoError(t, VoteOnContext(ctx, &MockManager{
- VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+ VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
require.Equal(t, txinfo.Transaction{
ID: 5678,
Node: "node",
@@ -97,9 +97,11 @@ func TestVoteOnContext(t *testing.T) {
BackchannelID: 1234,
}, tx)
callbackExecuted = true
+ require.Equal(t, expectedVote, vote)
+ require.Equal(t, voting.Prepared, phase)
return nil
},
- }, vote))
+ }, expectedVote, voting.Prepared))
require.True(t, callbackExecuted, "callback should have been executed")
})
@@ -110,10 +112,10 @@ func TestVoteOnContext(t *testing.T) {
expectedErr := fmt.Errorf("any error")
require.Equal(t, expectedErr, VoteOnContext(ctx, &MockManager{
- VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+ VoteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
return expectedErr
},
- }, vote))
+ }, expectedVote, voting.Prepared))
})
}
@@ -151,7 +153,7 @@ func TestCommitLockedFile(t *testing.T) {
calls := 0
require.NoError(t, CommitLockedFile(ctx, &MockManager{
- VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote) error {
+ VoteFn: func(ctx context.Context, tx txinfo.Transaction, vote voting.Vote, phase voting.Phase) error {
require.Equal(t, txinfo.Transaction{
ID: 5678,
Node: "node",
@@ -160,6 +162,16 @@ func TestCommitLockedFile(t *testing.T) {
}, tx)
require.Equal(t, voting.VoteFromData([]byte("contents")), vote)
calls++
+
+ switch calls {
+ case 1:
+ require.Equal(t, voting.Prepared, phase)
+ case 2:
+ require.Equal(t, voting.Committed, phase)
+ default:
+ require.FailNow(t, "unexpected voting phase %q", phase)
+ }
+
return nil
},
}, writer))
@@ -177,7 +189,7 @@ func TestCommitLockedFile(t *testing.T) {
require.NoError(t, err)
err = CommitLockedFile(ctx, &MockManager{
- VoteFn: func(context.Context, txinfo.Transaction, voting.Vote) error {
+ VoteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
return fmt.Errorf("some error")
},
}, writer)
@@ -195,11 +207,10 @@ func TestCommitLockedFile(t *testing.T) {
require.NoError(t, err)
err = CommitLockedFile(ctx, &MockManager{
- VoteFn: func(context.Context, txinfo.Transaction, voting.Vote) error {
+ VoteFn: func(context.Context, txinfo.Transaction, voting.Vote, voting.Phase) error {
// This shouldn't typically happen given that the file is locked,
// but we concurrently update the file after our first vote.
- require.NoError(t, os.WriteFile(file, []byte("something"),
- 0o666))
+ require.NoError(t, os.WriteFile(file, []byte("something"), 0o666))
return nil
},
}, writer)
diff --git a/internal/transaction/voting/phase.go b/internal/transaction/voting/phase.go
new file mode 100644
index 000000000..fb3922a6c
--- /dev/null
+++ b/internal/transaction/voting/phase.go
@@ -0,0 +1,36 @@
+package voting
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// Phase is the transactional phase a given vote can be cast on.
+type Phase int
+
+const (
+ // UnknownPhase is the default value. It should not be used.
+ UnknownPhase = Phase(iota)
+ // Prepared is the prepratory phase. The data that is about to change is locked for
+ // concurrent modification, but changes have not yet been written to disk.
+ Prepared
+ // Committed is the committing phase. Data has been committed to disk and will be visible
+ // in all subsequent requests.
+ Committed
+)
+
+// ToProto converts the phase into its Protobuf enum. This function panics if called with an
+// invalid phase.
+func (p Phase) ToProto() gitalypb.VoteTransactionRequest_Phase {
+ switch p {
+ case UnknownPhase:
+ return gitalypb.VoteTransactionRequest_UNKNOWN_PHASE
+ case Prepared:
+ return gitalypb.VoteTransactionRequest_PREPARED_PHASE
+ case Committed:
+ return gitalypb.VoteTransactionRequest_COMMITTED_PHASE
+ default:
+ panic(fmt.Sprintf("unknown phase %q", p))
+ }
+}