diff options
author | Justin Tobler <jtobler@gitlab.com> | 2022-10-06 19:40:01 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2022-10-13 02:01:34 +0300 |
commit | d6797c8d5f2a0be009dda8c7c678c6fe5b9f1f9e (patch) | |
tree | 6eb067c009d9032a452088cb46bf8fea9ce2ec3d | |
parent | 1512768a0e319c08b486fa923a6cc5b14dc43c80 (diff) |
Praefect: Consolidate quorum check logic
When a node voter is updated a check is performed to determine whether
quorum has already been achieved or if quorum is still attainable. A
similar check is also performed to determine whether the blocked voters
in the pending subtransaction can be unblocked. Future changes to the
node RPC error handling will require that subtransactions signal voters
to unblock once quorum becomes impossible. This change consolidates the
quorum check logic into a single function that can be reused by
`updateVoterState()` and `mustSignalVoters()`.
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 81 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction_test.go | 97 |
2 files changed, 140 insertions, 38 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index ab21107b8..b0d80ff1c 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -183,26 +183,12 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error } }() - var majorityVote *voting.Vote - var majorityVoteCount uint - for v, voteCount := range t.voteCounts { - if majorityVoteCount < voteCount { - v := v - majorityVoteCount = voteCount - majorityVote = &v - } - } - - var outstandingVotes uint - for _, voter := range t.votersByNode { - if voter.vote == nil { - outstandingVotes += voter.Votes - } - } - - // When the majority vote didn't yet cross the threshold and the number of outstanding votes - // may still get us across that threshold, then we need to wait for more votes to come in. - if majorityVoteCount < t.threshold && majorityVoteCount+outstandingVotes >= t.threshold { + // Check if quorum has been reached or if quorum is still attainable for the subtransaction. + // If quorum has not been achieved the vote returned by the quorum check will be `nil`. + // As long as quorum has not been achieved and is still possible, the subtransaction + // will wait for additional voter's results to come in. + majorityVote, quorumPossible := t.quorumCheck() + if majorityVote == nil && quorumPossible { return nil } @@ -223,15 +209,15 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error continue } - // If the majority vote count is smaller than the threshold at this point, then we - // know that we cannot ever reach it anymore even with the votes which are still - // outstanding. We can thus mark this node as failed. - if majorityVoteCount < t.threshold { + // If quorum is not possible we know there are not enough outstanding votes to cross + // the threshold required by the subtransaction. We can thus mark this node as failed. + if !quorumPossible { voter.result = VoteFailed continue } - // Otherwise, the result depends on whether the voter agrees on the quorum or not. + // At this point we know quorum has been achieved and a majority vote is present. + // A check is done to see if the voter agrees with the quorum majority vote. if *voter.vote == *majorityVote { voter.result = VoteCommitted } else { @@ -252,26 +238,45 @@ func (t *subtransaction) mustSignalVoters() bool { return false } - // Check if any node has reached the threshold. If it did, then we need - // to signal voters. - for _, voteCount := range t.voteCounts { - if voteCount >= t.threshold { - return true + majorityVote, quorumPossible := t.quorumCheck() + + // If there is majority vote threshold has been met and voters can be signaled. + if majorityVote != nil { + return true + } + + // If quorum is still possible the voters should not be signaled, since + // remaining voters could cause us to reach quorum. If quorum is not + // possible voters should be unblocked to allow the transaction to fail. + return !quorumPossible +} + +// quorumCheck returns the majority vote if quorum has been achieved +// and if not `nil` is returned. It also returns whether quorum can +// still be achieved with the outstanding voters. +func (t *subtransaction) quorumCheck() (*voting.Vote, bool) { + var leader *voting.Vote + var majority uint + for v, voteCount := range t.voteCounts { + if majority < voteCount { + v := v + majority = voteCount + leader = &v } } - // The threshold wasn't reached by any node yet. If there are undecided voters, then we - // cannot notify yet as any remaining nodes may cause us to reach quorum. + if majority >= t.threshold { + return leader, true + } + + var outstanding uint for _, voter := range t.votersByNode { - if voter.result == VoteUndecided { - return false + if voter.vote == nil && voter.result == VoteUndecided { + outstanding += voter.Votes } } - // Otherwise we know that all votes are in and that no quorum was - // reached. We thus need to notify callers of the failed vote as the - // last node which has cast its vote. - return true + return nil, majority+outstanding >= t.threshold } func (t *subtransaction) collectVotes(ctx context.Context, node string) error { diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go index d7b2eee42..5a202ffdb 100644 --- a/internal/praefect/transactions/subtransaction_test.go +++ b/internal/praefect/transactions/subtransaction_test.go @@ -593,6 +593,103 @@ func TestSubtransaction_race(t *testing.T) { } } +func TestSubtransaction_quorumCheck(t *testing.T) { + voteA := newVote(t, "a") + voteB := newVote(t, "b") + voteC := newVote(t, "c") + + for _, tc := range []struct { + desc string + voters []Voter + threshold uint + expVote *voting.Vote + expQuorum bool + }{ + { + desc: "No votes yet", + voters: []Voter{ + {Name: "1", Votes: 1, vote: nil, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: nil, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + threshold: 2, + expVote: nil, + expQuorum: true, + }, + { + desc: "Two nodes failed", + voters: []Voter{ + {Name: "1", Votes: 1, vote: nil, result: VoteCanceled}, + {Name: "2", Votes: 1, vote: nil, result: VoteCanceled}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + threshold: 2, + expVote: nil, + expQuorum: false, + }, + { + desc: "Two nodes vote differently", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + threshold: 2, + expVote: nil, + expQuorum: true, + }, + { + desc: "Two nodes vote same", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + threshold: 2, + expVote: &voteA, + expQuorum: true, + }, + { + desc: "One node votes different", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: &voteB, result: VoteUndecided}, + }, + threshold: 2, + expVote: &voteA, + expQuorum: true, + }, + { + desc: "All nodes votes different", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: &voteC, result: VoteUndecided}, + }, + threshold: 2, + expVote: nil, + expQuorum: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + subtransaction, err := newSubtransaction(tc.voters, tc.threshold) + require.NoError(t, err) + + // Vote counts usually updated when `updateVoterState()` is called + for _, voter := range tc.voters { + if voter.vote != nil { + subtransaction.voteCounts[*voter.vote] += voter.Votes + } + } + + majorityVote, quorumPossible := subtransaction.quorumCheck() + require.Equal(t, tc.expVote, majorityVote) + require.Equal(t, tc.expQuorum, quorumPossible) + }) + } +} + func newVote(t *testing.T, s string) voting.Vote { hash := sha1.Sum([]byte(s)) vote, err := voting.VoteFromHash(hash[:]) |