diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-15 18:14:27 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-20 20:48:16 +0300 |
commit | f58dd1af2f547ef959fe9dffa2f99e622f836936 (patch) | |
tree | 90ee8203336fededa41a8998f94900e5ff4fb18d | |
parent | f92a1cb8ef5ba0b9c1bab4a539a85764dcb63d42 (diff) |
Cancel a vote associated with a node that stops waiting for a quorum
When a Gitaly has cast its vote for a transactions, it waits until
the transaction reaches a quorum. If the Gitaly stops waiting for the
quorum, it would not commit the changes if the vote is successful.
As such, we should not consider its vote in the quorum anymore as it's
not going to persist the changes. This commit cancels a Gitaly's vote
if it stops waiting for a quorum.
Changelog: changed
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 33 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction_test.go | 123 |
2 files changed, 152 insertions, 4 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 981746ad7..2ce44bff2 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -2,6 +2,7 @@ package transactions import ( "context" + "errors" "fmt" "sync" @@ -239,22 +240,46 @@ func (t *subtransaction) mustSignalVoters() bool { return true } +// cancelVote cancels a node's vote if the subtransaction is still ongoing. This +// has to be called with the lock acquired as collectVotes does. +func (t *subtransaction) cancelVote(voter *Voter) error { + if t.isDone() { + // If the transaction is already done, it's too late to cancel our vote. + // Other nodes may have committed their changes already. + return errors.New("subtransaction was already finished") + } + + // Remove the voter's support for the vote so it's not counted towards the + // majority. The node is not going to commit the subtransaction anyway. + t.voteCounts[*voter.vote] -= voter.Votes + voter.result = VoteCanceled + return nil +} + func (t *subtransaction) collectVotes(ctx context.Context, node string) error { select { case <-ctx.Done(): - return ctx.Err() case <-t.doneCh: - break } - t.lock.RLock() - defer t.lock.RUnlock() + t.lock.Lock() + defer t.lock.Unlock() voter, ok := t.votersByNode[node] if !ok { return fmt.Errorf("invalid node for transaction: %q", node) } + // If the waiting stopped due to the context being canceled, we need to cancel + // this voter's votes. + if err := ctx.Err(); err != nil { + if err := t.cancelVote(voter); err != nil { + return fmt.Errorf("cancel vote: %w", err) + } + + return ctx.Err() + } + switch voter.result { case VoteCommitted: // Happy case, we are part of the quorum. diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go index 5ddf5eeec..e3d60e3eb 100644 --- a/internal/praefect/transactions/subtransaction_test.go +++ b/internal/praefect/transactions/subtransaction_test.go @@ -1,12 +1,15 @@ package transactions import ( + "context" "crypto/sha1" "errors" "fmt" "sync" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/transaction/voting" @@ -410,6 +413,126 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) { } } +func TestSubtransaction_voterStopsWaiting(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + agreeingVote := newVote(t, "agreeing") + disagreeingVote := newVote(t, "disagreeing") + + errorMessageForVote := func(agreeingVotes uint, threshold uint, vote voting.Vote) string { + return fmt.Sprintf("transaction did not reach quorum: got %d/%d votes for %s", agreeingVotes, threshold, vote) + } + + type outcomes []struct { + drops bool + vote voting.Vote + weight uint + errorMessage string + result VoteResult + } + + for _, tc := range []struct { + desc string + outcomes outcomes + }{ + { + desc: "quorum not reached", + outcomes: outcomes{ + {weight: 1, vote: agreeingVote, drops: true, errorMessage: context.Canceled.Error(), result: VoteCanceled}, + {weight: 1, vote: agreeingVote, errorMessage: errorMessageForVote(1, 2, agreeingVote), result: VoteFailed}, + {weight: 1, vote: disagreeingVote, errorMessage: errorMessageForVote(1, 2, disagreeingVote), result: VoteFailed}, + }, + }, + { + desc: "quorum reached", + outcomes: outcomes{ + {weight: 1, vote: agreeingVote, drops: true, errorMessage: context.Canceled.Error(), result: VoteCanceled}, + {weight: 1, vote: agreeingVote, result: VoteCommitted}, + {weight: 1, vote: agreeingVote, result: VoteCommitted}, + }, + }, + { + desc: "can't cancel a finished transaction", + outcomes: outcomes{ + {weight: 1, vote: agreeingVote, result: VoteCommitted}, + {weight: 1, vote: agreeingVote, result: VoteCommitted}, + {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"}, + }, + }, + { + desc: "primary cancels its vote before transaction is finished", + outcomes: outcomes{ + {weight: 2, vote: agreeingVote, drops: true, result: VoteCanceled, errorMessage: context.Canceled.Error()}, + {weight: 1, vote: agreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(2, 3, agreeingVote)}, + {weight: 1, vote: agreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(2, 3, agreeingVote)}, + }, + }, + { + desc: "secondary cancels its vote after crossing the threshold", + outcomes: outcomes{ + {weight: 2, vote: agreeingVote, result: VoteCommitted}, + {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"}, + {weight: 1, vote: disagreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(1, 3, disagreeingVote)}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 45*time.Second) + defer cancel() + + var totalWeight uint + var voters []Voter + for i, outcome := range tc.outcomes { + totalWeight += outcome.weight + voters = append(voters, Voter{Name: fmt.Sprintf("voter-%d", i), Votes: outcome.weight}) + } + + s, err := newSubtransaction(voters, totalWeight/2+1) + require.NoError(t, err) + + results := make([]chan error, len(tc.outcomes)) + for i, outcome := range tc.outcomes { + voterName := voters[i].Name + resultCh := make(chan error, 1) + results[i] = resultCh + + collectVotes := func(ctx context.Context) { resultCh <- s.collectVotes(ctx, voterName) } + + require.NoError(t, s.vote(voterName, outcome.vote)) + + if outcome.drops { + ctx, dropVoter := context.WithCancel(ctx) + dropVoter() + + // Run the dropping nodes's collectVotes in sync just to ensure + // we get the correct error back. If we ran all of the collectVotes + // async, the agreeing nodes could finish the transaction and + // we would not get a context.Canceled when the vote is successfully + // canceled. + collectVotes(ctx) + continue + } + + go collectVotes(ctx) + } + + for i, outcome := range tc.outcomes { + voterName := voters[i].Name + assert.Equal(t, outcome.result, s.state()[voterName], "Node: %q", voterName) + + err := <-results[i] + if outcome.errorMessage != "" { + assert.EqualError(t, err, outcome.errorMessage) + continue + } + + assert.NoError(t, err) + } + }) + } +} + func TestSubtransaction_race(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() |