diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-31 13:09:43 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-31 13:09:43 +0300 |
commit | 7bcfd0b9498c0226f4f21476920fd8cc98137876 (patch) | |
tree | b04787a5a2d03df2570d13660aa3d80b8ab91e1c | |
parent | 0fb8a85dbd8139be69e8ff248386ae60d40290b7 (diff) | |
parent | 7584c40e29ea3c1f5d2835f7622838d7d1b2c82a (diff) |
Merge branch 'pks-transactions-impossible-majority' into 'master'
transactions: Fail early if the threshold cannot be reached anymore
See merge request gitlab-org/gitaly!3530
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 128 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction_test.go | 66 |
2 files changed, 118 insertions, 76 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 2ce44bff2..03d01b8a6 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -125,6 +125,21 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error { return fmt.Errorf("node already cast a vote: %q", node) } + // Update voter state to reflect the new vote counts. Before quorum is reached, this + // function will check whether the threshold was reached and, if so, update all voters which + // have already cast a vote. After quorum was reached, it will only update the currently + // voting node. + if err := t.updateVoterState(voter, &vote); err != nil { + return fmt.Errorf("updating state of node %q: %w", node, err) + } + + return nil +} + +// updateVoterStates updates undecided voters or cancels existing votes of decided voters if given a +// `nil` vote. Voters are updated either as soon as quorum was reached or alternatively when all +// votes were cast. +func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error { switch voter.result { case VoteUndecided: // Happy case, we can still cast a vote. @@ -133,78 +148,98 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error { return ErrTransactionCanceled case VoteStopped: return ErrTransactionStopped + case VoteCommitted: + return fmt.Errorf("cannot change committed vote") default: // Because we didn't vote yet, we know that the node cannot be // either in VoteCommitted or VoteFailed state. - return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node) + return fmt.Errorf("voter is in invalid state %d", voter.result) } - voter.vote = &vote - - t.voteCounts[vote] += voter.Votes + switch { + case vote != nil: + if voter.vote != nil { + return errors.New("changing current vote is not allowed") + } - // Update voter states to reflect the new vote counts. Before quorum is - // reached, this function will check whether the threshold was reached - // and, if so, update all voters which have already cast a vote. After - // quorum was reached, it will only update the currently voting node. - t.updateVoterStates() + t.voteCounts[*vote] += voter.Votes + voter.vote = vote + case vote == nil: + if t.isDone() { + // If the transaction is already done, it's too late to cancel our vote. + // Other nodes may have committed their changes already. + return errors.New("subtransaction was already finished") + } - if t.mustSignalVoters() { - close(t.doneCh) + // Remove the voter's support for the vote so it's not counted towards the + // majority. The node is not going to commit the subtransaction anyway. + t.voteCounts[*voter.vote] -= voter.Votes + voter.result = VoteCanceled } - return nil -} + defer func() { + if t.mustSignalVoters() { + close(t.doneCh) + } + }() -// updateVoterStates updates undecided voters. Voters are updated either as -// soon as quorum was reached or alternatively when all votes were cast. -func (t *subtransaction) updateVoterStates() { var majorityVote *voting.Vote + var majorityVoteCount uint for v, voteCount := range t.voteCounts { - if voteCount >= t.threshold { + if majorityVoteCount < voteCount { v := v + majorityVoteCount = voteCount majorityVote = &v - break } } - allVotesCast := true + var outstandingVotes uint for _, voter := range t.votersByNode { if voter.vote == nil { - allVotesCast = false - break + outstandingVotes += voter.Votes } } - // We need to adjust voter states either when quorum was reached or - // when all votes were cast. If all votes were cast without reaching - // quorum, we set all voters into VoteFailed state. - if majorityVote == nil && !allVotesCast { - return + // When the majority vote didn't yet cross the threshold and the number of outstanding votes + // may still get us across that threshold, then we need to wait for more votes to come in. + if majorityVoteCount < t.threshold && majorityVoteCount+outstandingVotes >= t.threshold { + return nil } - // Update all voters which have cast a vote and which are not - // undecided. We mustn't change any voters which did decide on an - // outcome already as they may have already committed or aborted their - // action. + // Update all voters which have cast a vote and which are not undecided. We mustn't change + // any voters which did decide on an outcome already as they may have already committed or + // aborted their action. for _, voter := range t.votersByNode { + // We cannot change the mind of nodes which have already settled on any outcome + // after the fact. if voter.result != VoteUndecided { continue } - if voter.vote == nil || majorityVote == nil { - if allVotesCast { - voter.result = VoteFailed - } + // We do not change the mind of any voter which didn't yet cast its vote. While it + // may be true that it can only fail anyway, it is easier to handle if we just wait + // for its incoming vote and set it to failed at that point in time. + if voter.vote == nil { + continue + } + + // If the majority vote count is smaller than the threshold at this point, then we + // know that we cannot ever reach it anymore even with the votes which are still + // outstanding. We can thus mark this node as failed. + if majorityVoteCount < t.threshold { + voter.result = VoteFailed continue } + // Otherwise, the result depends on whether the voter agrees on the quorum or not. if *voter.vote == *majorityVote { voter.result = VoteCommitted } else { voter.result = VoteFailed } } + + return nil } // mustSignalVoters determines whether we need to signal voters. Signalling may @@ -225,11 +260,10 @@ func (t *subtransaction) mustSignalVoters() bool { } } - // The threshold wasn't reached by any node yet. If there are missing - // votes, then we cannot notify yet as any remaining nodes may cause us - // to reach quorum. + // The threshold wasn't reached by any node yet. If there are undecided voters, then we + // cannot notify yet as any remaining nodes may cause us to reach quorum. for _, voter := range t.votersByNode { - if voter.vote == nil { + if voter.result == VoteUndecided { return false } } @@ -240,22 +274,6 @@ func (t *subtransaction) mustSignalVoters() bool { return true } -// cancelVote cancels a node's vote if the subtransaction is still ongoing. This -// has to be called with the lock acquired as collectVotes does. -func (t *subtransaction) cancelVote(voter *Voter) error { - if t.isDone() { - // If the transaction is already done, it's too late to cancel our vote. - // Other nodes may have committed their changes already. - return errors.New("subtransaction was already finished") - } - - // Remove the voter's support for the vote so it's not counted towards the - // majority. The node is not going to commit the subtransaction anyway. - t.voteCounts[*voter.vote] -= voter.Votes - voter.result = VoteCanceled - return nil -} - func (t *subtransaction) collectVotes(ctx context.Context, node string) error { select { case <-ctx.Done(): @@ -273,7 +291,7 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { // If the waiting stopped due to the context being canceled, we need to cancel // this voter's votes. if err := ctx.Err(); err != nil { - if err := t.cancelVote(voter); err != nil { + if err := t.updateVoterState(voter, nil); err != nil { return fmt.Errorf("cancel vote: %w", err) } diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go index e3d60e3eb..97269c05f 100644 --- a/internal/praefect/transactions/subtransaction_test.go +++ b/internal/praefect/transactions/subtransaction_test.go @@ -124,6 +124,7 @@ func TestSubtransaction_vote(t *testing.T) { var zeroVote voting.Vote voteA := newVote(t, "a") voteB := newVote(t, "b") + voteC := newVote(t, "c") for _, tc := range []struct { desc string @@ -193,7 +194,7 @@ func TestSubtransaction_vote(t *testing.T) { {Name: "1", Votes: 1, result: VoteCanceled}, }, expectedVoteCounts: map[voting.Vote]uint{}, - expectedErr: ErrTransactionCanceled, + expectedErr: fmt.Errorf("updating state of node \"1\": %w", ErrTransactionCanceled), }, { desc: "single voter trying to vote on stopped transaction", @@ -207,7 +208,7 @@ func TestSubtransaction_vote(t *testing.T) { {Name: "1", Votes: 1, result: VoteStopped}, }, expectedVoteCounts: map[voting.Vote]uint{}, - expectedErr: ErrTransactionStopped, + expectedErr: fmt.Errorf("updating state of node \"1\": %w", ErrTransactionStopped), }, { desc: "multiple voters doing final vote", @@ -287,6 +288,29 @@ func TestSubtransaction_vote(t *testing.T) { voteB: 1, }, }, + { + desc: "multiple disagreeing voters fail early", + voters: []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1, vote: &voteB}, + {Name: "3", Votes: 1, vote: &voteC}, + {Name: "4", Votes: 1}, + }, + threshold: 3, + voterName: "1", + vote: voteA, + expectedVoterState: []Voter{ + {Name: "1", Votes: 1, result: VoteFailed, vote: &voteA}, + {Name: "2", Votes: 1, result: VoteFailed, vote: &voteB}, + {Name: "3", Votes: 1, result: VoteFailed, vote: &voteC}, + {Name: "4", Votes: 1, result: VoteUndecided}, + }, + expectedVoteCounts: map[voting.Vote]uint{ + voteA: 1, + voteB: 1, + voteC: 1, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { s, err := newSubtransaction(tc.voters, tc.threshold) @@ -328,7 +352,7 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "single voter with vote", voters: []Voter{ - {Name: "1", Votes: 1, vote: &voteA}, + {Name: "1", Votes: 1, vote: &voteA, result: VoteCommitted}, }, threshold: 1, mustSignal: true, @@ -336,7 +360,7 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "single voter with missing vote", voters: []Voter{ - {Name: "1", Votes: 1}, + {Name: "1", Votes: 1, result: VoteUndecided}, }, threshold: 1, mustSignal: false, @@ -344,9 +368,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "multiple agreeing voters", voters: []Voter{ - {Name: "1", Votes: 1, vote: &voteA}, - {Name: "2", Votes: 1, vote: &voteA}, - {Name: "3", Votes: 1, vote: &voteA}, + {Name: "1", Votes: 1, vote: &voteA, result: VoteCommitted}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteCommitted}, + {Name: "3", Votes: 1, vote: &voteA, result: VoteCommitted}, }, threshold: 1, mustSignal: true, @@ -354,9 +378,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "multiple disagreeing voters not reaching threshold", voters: []Voter{ - {Name: "1", Votes: 1, vote: &voteA}, - {Name: "2", Votes: 1, vote: &voteB}, - {Name: "3", Votes: 1, vote: &voteC}, + {Name: "1", Votes: 1, vote: &voteA, result: VoteFailed}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteFailed}, + {Name: "3", Votes: 1, vote: &voteC, result: VoteFailed}, }, threshold: 3, mustSignal: true, @@ -364,9 +388,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "multiple disagreeing voters reaching threshold", voters: []Voter{ - {Name: "1", Votes: 1, vote: &voteA}, - {Name: "2", Votes: 1, vote: &voteB}, - {Name: "3", Votes: 1, vote: &voteB}, + {Name: "1", Votes: 1, vote: &voteA, result: VoteFailed}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteCommitted}, + {Name: "3", Votes: 1, vote: &voteB, result: VoteCommitted}, }, threshold: 2, mustSignal: true, @@ -374,9 +398,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "multiple voters reach quorum with with missing votes", voters: []Voter{ - {Name: "1", Votes: 1}, - {Name: "2", Votes: 1, vote: &voteA}, - {Name: "3", Votes: 1, vote: &voteA}, + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteCommitted}, + {Name: "3", Votes: 1, vote: &voteA, result: VoteCommitted}, }, threshold: 2, mustSignal: true, @@ -384,9 +408,9 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { { desc: "multiple voters do not reach quorum with missing votes", voters: []Voter{ - {Name: "1", Votes: 1}, - {Name: "2", Votes: 1, vote: &voteB}, - {Name: "3", Votes: 1, vote: &voteB}, + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteCommitted}, + {Name: "3", Votes: 1, vote: &voteB, result: VoteCommitted}, }, threshold: 3, mustSignal: false, @@ -457,7 +481,7 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) { outcomes: outcomes{ {weight: 1, vote: agreeingVote, result: VoteCommitted}, {weight: 1, vote: agreeingVote, result: VoteCommitted}, - {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"}, + {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: cannot change committed vote"}, }, }, { @@ -472,7 +496,7 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) { desc: "secondary cancels its vote after crossing the threshold", outcomes: outcomes{ {weight: 2, vote: agreeingVote, result: VoteCommitted}, - {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"}, + {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: cannot change committed vote"}, {weight: 1, vote: disagreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(1, 3, disagreeingVote)}, }, }, |