Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2022-11-03 16:42:26 +0300
committerJustin Tobler <jtobler@gitlab.com>2022-11-04 00:59:12 +0300
commit8c634c2b917e0be735c385f9f2d53b67298dd1ba (patch)
tree5f9b0adf49e2e4a6cab92c2aa6b34ba56e543f0a
parentb55578ec476e8bc8ecd9775ee7e9960b52e0f6e0 (diff)
Praefect: Fix transaction voter state race
When creating a new subtransaction previous voter state needs to be propagated forward so the new subtransaction can be aware of previously canceled voters. While reading the voter result state of the previous subtransaction, the subtransaction could also have its voter state updated through a call to `updateVoterState()`. This change moves voter propagation logic to a new subtransaction method `getPropagatedVoters()`. This method locks its associated subtransaction to prevent concurrent writes to its voter result state.
-rw-r--r--internal/praefect/transactions/subtransaction.go36
-rw-r--r--internal/praefect/transactions/subtransaction_test.go102
-rw-r--r--internal/praefect/transactions/transaction.go29
3 files changed, 143 insertions, 24 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 19a43b667..b13ae6948 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -404,3 +404,39 @@ func (t *subtransaction) cancelNodeVoter(node string) error {
return nil
}
+
+// getPropagatedVoters returns provided voters with voter result state
+// matching canceled subtransaction voters. This is used because newly
+// created subtransactions need to propagate previously canceled voters.
+func (t *subtransaction) getPropagatedVoters(voters []Voter) ([]Voter, error) {
+ // Lock subtransaction to prevent concurrent writes to voter
+ // result state from `updateVoterState()`.
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ // Check subtransaction voters state and propagate canceled voters.
+ var propagatedVoters []Voter
+ for _, voter := range voters {
+ subVoter := t.votersByNode[voter.Name]
+ if subVoter == nil {
+ // This error should in theory never be reached. When a
+ // subtransaction is created it receives all voters from
+ // the parent transaction. The parent transaction voters
+ // are not mutated throughout the lifespan of the
+ // transaction meaning that all voters in a transaction
+ // should be present in a subtransaction.
+ return nil, errors.New("subtransaction missing voter")
+ }
+
+ // Only canceled voters need to be propagated since a node voter
+ // can be canceled and the transaction continue. Other terminal
+ // results are applied to voters and end the transaction.
+ if subVoter.result == VoteCanceled {
+ voter.result = VoteCanceled
+ }
+
+ propagatedVoters = append(propagatedVoters, voter)
+ }
+
+ return propagatedVoters, nil
+}
diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go
index 24b35cc54..7cb7fa61c 100644
--- a/internal/praefect/transactions/subtransaction_test.go
+++ b/internal/praefect/transactions/subtransaction_test.go
@@ -901,6 +901,108 @@ func TestSubtransaction_cancelNodeVoter(t *testing.T) {
}
}
+func TestSubtransaction_getPropagatedVoters(t *testing.T) {
+ t.Parallel()
+
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ voters []Voter
+ subVoters []Voter
+ expVoters []Voter
+ expErrMsg string
+ }{
+ {
+ desc: "No voters to propagate",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ subVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expErrMsg: "",
+ },
+ {
+ desc: "Canceled voter propagates",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ subVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expErrMsg: "",
+ },
+ {
+ desc: "Only canceled voters propagate",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ subVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteFailed},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expErrMsg: "",
+ },
+ {
+ desc: "Transaction/subtransaction voter mismatch",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ {Name: "4", Votes: 1, result: VoteUndecided},
+ },
+ subVoters: []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ expVoters: nil,
+ expErrMsg: "subtransaction missing voter",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ subtransaction, err := newSubtransaction(tc.subVoters, threshold)
+ require.NoError(t, err)
+
+ propagatedVoters, err := subtransaction.getPropagatedVoters(tc.voters)
+ if err != nil {
+ require.EqualError(t, err, tc.expErrMsg)
+ require.Nil(t, propagatedVoters)
+ } else {
+ require.NoError(t, err)
+ require.Equal(t, tc.expVoters, propagatedVoters)
+ }
+ })
+ }
+}
+
func newVote(t *testing.T, s string) voting.Vote {
hash := sha1.Sum([]byte(s))
vote, err := voting.VoteFromHash(hash[:])
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index fed090e81..c875fcad2 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -293,35 +293,16 @@ func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransact
// subtransactions do not wait for an impossible quorum due to canceled
// voters.
func (t *transaction) createSubtransaction() (*subtransaction, error) {
- // If there are no subtransactions propagation can be skipped
+ // If there are no subtransactions propagation can be skipped.
if len(t.subtransactions) == 0 {
return newSubtransaction(t.voters, t.threshold)
}
+ // Propagate canceled voters from previous subtransaction.
prevSub := t.subtransactions[len(t.subtransactions)-1]
-
- // Check previous voters state and propagate canceled voters.
- var propagatedVoters []Voter
- for _, voter := range t.voters {
- prevVoter := prevSub.votersByNode[voter.Name]
- if prevVoter == nil {
- // This error should in theory never be reached. When a
- // subtransaction is created it receives all voters from
- // the parent transaction. The parent transaction voters
- // are not mutated throughout the lifespan of the
- // transaction meaning that all voters in a transaction
- // should be present in a subtransaction.
- return nil, errors.New("subtransaction missing previous voter")
- }
-
- // Only canceled voters need to be propagated since a node voter
- // can be canceled and the transaction continue. Other terminal
- // results are applied to voters and end the transaction.
- if prevVoter.result == VoteCanceled {
- voter.result = VoteCanceled
- }
-
- propagatedVoters = append(propagatedVoters, voter)
+ propagatedVoters, err := prevSub.getPropagatedVoters(t.voters)
+ if err != nil {
+ return nil, err
}
return newSubtransaction(propagatedVoters, t.threshold)