Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-31 13:09:43 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-05-31 13:09:43 +0300
commit7bcfd0b9498c0226f4f21476920fd8cc98137876 (patch)
treeb04787a5a2d03df2570d13660aa3d80b8ab91e1c
parent0fb8a85dbd8139be69e8ff248386ae60d40290b7 (diff)
parent7584c40e29ea3c1f5d2835f7622838d7d1b2c82a (diff)
Merge branch 'pks-transactions-impossible-majority' into 'master'
transactions: Fail early if the threshold cannot be reached anymore See merge request gitlab-org/gitaly!3530
-rw-r--r--internal/praefect/transactions/subtransaction.go128
-rw-r--r--internal/praefect/transactions/subtransaction_test.go66
2 files changed, 118 insertions, 76 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 2ce44bff2..03d01b8a6 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -125,6 +125,21 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error {
return fmt.Errorf("node already cast a vote: %q", node)
}
+ // Update voter state to reflect the new vote counts. Before quorum is reached, this
+ // function will check whether the threshold was reached and, if so, update all voters which
+ // have already cast a vote. After quorum was reached, it will only update the currently
+ // voting node.
+ if err := t.updateVoterState(voter, &vote); err != nil {
+ return fmt.Errorf("updating state of node %q: %w", node, err)
+ }
+
+ return nil
+}
+
+// updateVoterStates updates undecided voters or cancels existing votes of decided voters if given a
+// `nil` vote. Voters are updated either as soon as quorum was reached or alternatively when all
+// votes were cast.
+func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error {
switch voter.result {
case VoteUndecided:
// Happy case, we can still cast a vote.
@@ -133,78 +148,98 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error {
return ErrTransactionCanceled
case VoteStopped:
return ErrTransactionStopped
+ case VoteCommitted:
+ return fmt.Errorf("cannot change committed vote")
default:
// Because we didn't vote yet, we know that the node cannot be
// either in VoteCommitted or VoteFailed state.
- return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
+ return fmt.Errorf("voter is in invalid state %d", voter.result)
}
- voter.vote = &vote
-
- t.voteCounts[vote] += voter.Votes
+ switch {
+ case vote != nil:
+ if voter.vote != nil {
+ return errors.New("changing current vote is not allowed")
+ }
- // Update voter states to reflect the new vote counts. Before quorum is
- // reached, this function will check whether the threshold was reached
- // and, if so, update all voters which have already cast a vote. After
- // quorum was reached, it will only update the currently voting node.
- t.updateVoterStates()
+ t.voteCounts[*vote] += voter.Votes
+ voter.vote = vote
+ case vote == nil:
+ if t.isDone() {
+ // If the transaction is already done, it's too late to cancel our vote.
+ // Other nodes may have committed their changes already.
+ return errors.New("subtransaction was already finished")
+ }
- if t.mustSignalVoters() {
- close(t.doneCh)
+ // Remove the voter's support for the vote so it's not counted towards the
+ // majority. The node is not going to commit the subtransaction anyway.
+ t.voteCounts[*voter.vote] -= voter.Votes
+ voter.result = VoteCanceled
}
- return nil
-}
+ defer func() {
+ if t.mustSignalVoters() {
+ close(t.doneCh)
+ }
+ }()
-// updateVoterStates updates undecided voters. Voters are updated either as
-// soon as quorum was reached or alternatively when all votes were cast.
-func (t *subtransaction) updateVoterStates() {
var majorityVote *voting.Vote
+ var majorityVoteCount uint
for v, voteCount := range t.voteCounts {
- if voteCount >= t.threshold {
+ if majorityVoteCount < voteCount {
v := v
+ majorityVoteCount = voteCount
majorityVote = &v
- break
}
}
- allVotesCast := true
+ var outstandingVotes uint
for _, voter := range t.votersByNode {
if voter.vote == nil {
- allVotesCast = false
- break
+ outstandingVotes += voter.Votes
}
}
- // We need to adjust voter states either when quorum was reached or
- // when all votes were cast. If all votes were cast without reaching
- // quorum, we set all voters into VoteFailed state.
- if majorityVote == nil && !allVotesCast {
- return
+ // When the majority vote didn't yet cross the threshold and the number of outstanding votes
+ // may still get us across that threshold, then we need to wait for more votes to come in.
+ if majorityVoteCount < t.threshold && majorityVoteCount+outstandingVotes >= t.threshold {
+ return nil
}
- // Update all voters which have cast a vote and which are not
- // undecided. We mustn't change any voters which did decide on an
- // outcome already as they may have already committed or aborted their
- // action.
+ // Update all voters which have cast a vote and which are not undecided. We mustn't change
+ // any voters which did decide on an outcome already as they may have already committed or
+ // aborted their action.
for _, voter := range t.votersByNode {
+ // We cannot change the mind of nodes which have already settled on any outcome
+ // after the fact.
if voter.result != VoteUndecided {
continue
}
- if voter.vote == nil || majorityVote == nil {
- if allVotesCast {
- voter.result = VoteFailed
- }
+ // We do not change the mind of any voter which didn't yet cast its vote. While it
+ // may be true that it can only fail anyway, it is easier to handle if we just wait
+ // for its incoming vote and set it to failed at that point in time.
+ if voter.vote == nil {
+ continue
+ }
+
+ // If the majority vote count is smaller than the threshold at this point, then we
+ // know that we cannot ever reach it anymore even with the votes which are still
+ // outstanding. We can thus mark this node as failed.
+ if majorityVoteCount < t.threshold {
+ voter.result = VoteFailed
continue
}
+ // Otherwise, the result depends on whether the voter agrees on the quorum or not.
if *voter.vote == *majorityVote {
voter.result = VoteCommitted
} else {
voter.result = VoteFailed
}
}
+
+ return nil
}
// mustSignalVoters determines whether we need to signal voters. Signalling may
@@ -225,11 +260,10 @@ func (t *subtransaction) mustSignalVoters() bool {
}
}
- // The threshold wasn't reached by any node yet. If there are missing
- // votes, then we cannot notify yet as any remaining nodes may cause us
- // to reach quorum.
+ // The threshold wasn't reached by any node yet. If there are undecided voters, then we
+ // cannot notify yet as any remaining nodes may cause us to reach quorum.
for _, voter := range t.votersByNode {
- if voter.vote == nil {
+ if voter.result == VoteUndecided {
return false
}
}
@@ -240,22 +274,6 @@ func (t *subtransaction) mustSignalVoters() bool {
return true
}
-// cancelVote cancels a node's vote if the subtransaction is still ongoing. This
-// has to be called with the lock acquired as collectVotes does.
-func (t *subtransaction) cancelVote(voter *Voter) error {
- if t.isDone() {
- // If the transaction is already done, it's too late to cancel our vote.
- // Other nodes may have committed their changes already.
- return errors.New("subtransaction was already finished")
- }
-
- // Remove the voter's support for the vote so it's not counted towards the
- // majority. The node is not going to commit the subtransaction anyway.
- t.voteCounts[*voter.vote] -= voter.Votes
- voter.result = VoteCanceled
- return nil
-}
-
func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
select {
case <-ctx.Done():
@@ -273,7 +291,7 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
// If the waiting stopped due to the context being canceled, we need to cancel
// this voter's votes.
if err := ctx.Err(); err != nil {
- if err := t.cancelVote(voter); err != nil {
+ if err := t.updateVoterState(voter, nil); err != nil {
return fmt.Errorf("cancel vote: %w", err)
}
diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go
index e3d60e3eb..97269c05f 100644
--- a/internal/praefect/transactions/subtransaction_test.go
+++ b/internal/praefect/transactions/subtransaction_test.go
@@ -124,6 +124,7 @@ func TestSubtransaction_vote(t *testing.T) {
var zeroVote voting.Vote
voteA := newVote(t, "a")
voteB := newVote(t, "b")
+ voteC := newVote(t, "c")
for _, tc := range []struct {
desc string
@@ -193,7 +194,7 @@ func TestSubtransaction_vote(t *testing.T) {
{Name: "1", Votes: 1, result: VoteCanceled},
},
expectedVoteCounts: map[voting.Vote]uint{},
- expectedErr: ErrTransactionCanceled,
+ expectedErr: fmt.Errorf("updating state of node \"1\": %w", ErrTransactionCanceled),
},
{
desc: "single voter trying to vote on stopped transaction",
@@ -207,7 +208,7 @@ func TestSubtransaction_vote(t *testing.T) {
{Name: "1", Votes: 1, result: VoteStopped},
},
expectedVoteCounts: map[voting.Vote]uint{},
- expectedErr: ErrTransactionStopped,
+ expectedErr: fmt.Errorf("updating state of node \"1\": %w", ErrTransactionStopped),
},
{
desc: "multiple voters doing final vote",
@@ -287,6 +288,29 @@ func TestSubtransaction_vote(t *testing.T) {
voteB: 1,
},
},
+ {
+ desc: "multiple disagreeing voters fail early",
+ voters: []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1, vote: &voteB},
+ {Name: "3", Votes: 1, vote: &voteC},
+ {Name: "4", Votes: 1},
+ },
+ threshold: 3,
+ voterName: "1",
+ vote: voteA,
+ expectedVoterState: []Voter{
+ {Name: "1", Votes: 1, result: VoteFailed, vote: &voteA},
+ {Name: "2", Votes: 1, result: VoteFailed, vote: &voteB},
+ {Name: "3", Votes: 1, result: VoteFailed, vote: &voteC},
+ {Name: "4", Votes: 1, result: VoteUndecided},
+ },
+ expectedVoteCounts: map[voting.Vote]uint{
+ voteA: 1,
+ voteB: 1,
+ voteC: 1,
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
s, err := newSubtransaction(tc.voters, tc.threshold)
@@ -328,7 +352,7 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "single voter with vote",
voters: []Voter{
- {Name: "1", Votes: 1, vote: &voteA},
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteCommitted},
},
threshold: 1,
mustSignal: true,
@@ -336,7 +360,7 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "single voter with missing vote",
voters: []Voter{
- {Name: "1", Votes: 1},
+ {Name: "1", Votes: 1, result: VoteUndecided},
},
threshold: 1,
mustSignal: false,
@@ -344,9 +368,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "multiple agreeing voters",
voters: []Voter{
- {Name: "1", Votes: 1, vote: &voteA},
- {Name: "2", Votes: 1, vote: &voteA},
- {Name: "3", Votes: 1, vote: &voteA},
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteCommitted},
+ {Name: "2", Votes: 1, vote: &voteA, result: VoteCommitted},
+ {Name: "3", Votes: 1, vote: &voteA, result: VoteCommitted},
},
threshold: 1,
mustSignal: true,
@@ -354,9 +378,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "multiple disagreeing voters not reaching threshold",
voters: []Voter{
- {Name: "1", Votes: 1, vote: &voteA},
- {Name: "2", Votes: 1, vote: &voteB},
- {Name: "3", Votes: 1, vote: &voteC},
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteFailed},
+ {Name: "2", Votes: 1, vote: &voteB, result: VoteFailed},
+ {Name: "3", Votes: 1, vote: &voteC, result: VoteFailed},
},
threshold: 3,
mustSignal: true,
@@ -364,9 +388,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "multiple disagreeing voters reaching threshold",
voters: []Voter{
- {Name: "1", Votes: 1, vote: &voteA},
- {Name: "2", Votes: 1, vote: &voteB},
- {Name: "3", Votes: 1, vote: &voteB},
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteFailed},
+ {Name: "2", Votes: 1, vote: &voteB, result: VoteCommitted},
+ {Name: "3", Votes: 1, vote: &voteB, result: VoteCommitted},
},
threshold: 2,
mustSignal: true,
@@ -374,9 +398,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "multiple voters reach quorum with with missing votes",
voters: []Voter{
- {Name: "1", Votes: 1},
- {Name: "2", Votes: 1, vote: &voteA},
- {Name: "3", Votes: 1, vote: &voteA},
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteA, result: VoteCommitted},
+ {Name: "3", Votes: 1, vote: &voteA, result: VoteCommitted},
},
threshold: 2,
mustSignal: true,
@@ -384,9 +408,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
{
desc: "multiple voters do not reach quorum with missing votes",
voters: []Voter{
- {Name: "1", Votes: 1},
- {Name: "2", Votes: 1, vote: &voteB},
- {Name: "3", Votes: 1, vote: &voteB},
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteB, result: VoteCommitted},
+ {Name: "3", Votes: 1, vote: &voteB, result: VoteCommitted},
},
threshold: 3,
mustSignal: false,
@@ -457,7 +481,7 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) {
outcomes: outcomes{
{weight: 1, vote: agreeingVote, result: VoteCommitted},
{weight: 1, vote: agreeingVote, result: VoteCommitted},
- {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"},
+ {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: cannot change committed vote"},
},
},
{
@@ -472,7 +496,7 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) {
desc: "secondary cancels its vote after crossing the threshold",
outcomes: outcomes{
{weight: 2, vote: agreeingVote, result: VoteCommitted},
- {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"},
+ {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: cannot change committed vote"},
{weight: 1, vote: disagreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(1, 3, disagreeingVote)},
},
},