Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-15 18:14:27 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-05-20 20:48:16 +0300
commitf58dd1af2f547ef959fe9dffa2f99e622f836936 (patch)
tree90ee8203336fededa41a8998f94900e5ff4fb18d
parentf92a1cb8ef5ba0b9c1bab4a539a85764dcb63d42 (diff)
Cancel a vote associated with a node that stops waiting for a quorum
When a Gitaly has cast its vote for a transactions, it waits until the transaction reaches a quorum. If the Gitaly stops waiting for the quorum, it would not commit the changes if the vote is successful. As such, we should not consider its vote in the quorum anymore as it's not going to persist the changes. This commit cancels a Gitaly's vote if it stops waiting for a quorum. Changelog: changed
-rw-r--r--internal/praefect/transactions/subtransaction.go33
-rw-r--r--internal/praefect/transactions/subtransaction_test.go123
2 files changed, 152 insertions, 4 deletions
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 981746ad7..2ce44bff2 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -2,6 +2,7 @@ package transactions
import (
"context"
+ "errors"
"fmt"
"sync"
@@ -239,22 +240,46 @@ func (t *subtransaction) mustSignalVoters() bool {
return true
}
+// cancelVote cancels a node's vote if the subtransaction is still ongoing. This
+// has to be called with the lock acquired as collectVotes does.
+func (t *subtransaction) cancelVote(voter *Voter) error {
+ if t.isDone() {
+ // If the transaction is already done, it's too late to cancel our vote.
+ // Other nodes may have committed their changes already.
+ return errors.New("subtransaction was already finished")
+ }
+
+ // Remove the voter's support for the vote so it's not counted towards the
+ // majority. The node is not going to commit the subtransaction anyway.
+ t.voteCounts[*voter.vote] -= voter.Votes
+ voter.result = VoteCanceled
+ return nil
+}
+
func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
select {
case <-ctx.Done():
- return ctx.Err()
case <-t.doneCh:
- break
}
- t.lock.RLock()
- defer t.lock.RUnlock()
+ t.lock.Lock()
+ defer t.lock.Unlock()
voter, ok := t.votersByNode[node]
if !ok {
return fmt.Errorf("invalid node for transaction: %q", node)
}
+ // If the waiting stopped due to the context being canceled, we need to cancel
+ // this voter's votes.
+ if err := ctx.Err(); err != nil {
+ if err := t.cancelVote(voter); err != nil {
+ return fmt.Errorf("cancel vote: %w", err)
+ }
+
+ return ctx.Err()
+ }
+
switch voter.result {
case VoteCommitted:
// Happy case, we are part of the quorum.
diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go
index 5ddf5eeec..e3d60e3eb 100644
--- a/internal/praefect/transactions/subtransaction_test.go
+++ b/internal/praefect/transactions/subtransaction_test.go
@@ -1,12 +1,15 @@
package transactions
import (
+ "context"
"crypto/sha1"
"errors"
"fmt"
"sync"
"testing"
+ "time"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/transaction/voting"
@@ -410,6 +413,126 @@ func TestSubtransaction_mustSignalVoters(t *testing.T) {
}
}
+func TestSubtransaction_voterStopsWaiting(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ agreeingVote := newVote(t, "agreeing")
+ disagreeingVote := newVote(t, "disagreeing")
+
+ errorMessageForVote := func(agreeingVotes uint, threshold uint, vote voting.Vote) string {
+ return fmt.Sprintf("transaction did not reach quorum: got %d/%d votes for %s", agreeingVotes, threshold, vote)
+ }
+
+ type outcomes []struct {
+ drops bool
+ vote voting.Vote
+ weight uint
+ errorMessage string
+ result VoteResult
+ }
+
+ for _, tc := range []struct {
+ desc string
+ outcomes outcomes
+ }{
+ {
+ desc: "quorum not reached",
+ outcomes: outcomes{
+ {weight: 1, vote: agreeingVote, drops: true, errorMessage: context.Canceled.Error(), result: VoteCanceled},
+ {weight: 1, vote: agreeingVote, errorMessage: errorMessageForVote(1, 2, agreeingVote), result: VoteFailed},
+ {weight: 1, vote: disagreeingVote, errorMessage: errorMessageForVote(1, 2, disagreeingVote), result: VoteFailed},
+ },
+ },
+ {
+ desc: "quorum reached",
+ outcomes: outcomes{
+ {weight: 1, vote: agreeingVote, drops: true, errorMessage: context.Canceled.Error(), result: VoteCanceled},
+ {weight: 1, vote: agreeingVote, result: VoteCommitted},
+ {weight: 1, vote: agreeingVote, result: VoteCommitted},
+ },
+ },
+ {
+ desc: "can't cancel a finished transaction",
+ outcomes: outcomes{
+ {weight: 1, vote: agreeingVote, result: VoteCommitted},
+ {weight: 1, vote: agreeingVote, result: VoteCommitted},
+ {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"},
+ },
+ },
+ {
+ desc: "primary cancels its vote before transaction is finished",
+ outcomes: outcomes{
+ {weight: 2, vote: agreeingVote, drops: true, result: VoteCanceled, errorMessage: context.Canceled.Error()},
+ {weight: 1, vote: agreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(2, 3, agreeingVote)},
+ {weight: 1, vote: agreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(2, 3, agreeingVote)},
+ },
+ },
+ {
+ desc: "secondary cancels its vote after crossing the threshold",
+ outcomes: outcomes{
+ {weight: 2, vote: agreeingVote, result: VoteCommitted},
+ {weight: 1, vote: agreeingVote, drops: true, result: VoteCommitted, errorMessage: "cancel vote: subtransaction was already finished"},
+ {weight: 1, vote: disagreeingVote, result: VoteFailed, errorMessage: errorMessageForVote(1, 3, disagreeingVote)},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := context.WithTimeout(ctx, 45*time.Second)
+ defer cancel()
+
+ var totalWeight uint
+ var voters []Voter
+ for i, outcome := range tc.outcomes {
+ totalWeight += outcome.weight
+ voters = append(voters, Voter{Name: fmt.Sprintf("voter-%d", i), Votes: outcome.weight})
+ }
+
+ s, err := newSubtransaction(voters, totalWeight/2+1)
+ require.NoError(t, err)
+
+ results := make([]chan error, len(tc.outcomes))
+ for i, outcome := range tc.outcomes {
+ voterName := voters[i].Name
+ resultCh := make(chan error, 1)
+ results[i] = resultCh
+
+ collectVotes := func(ctx context.Context) { resultCh <- s.collectVotes(ctx, voterName) }
+
+ require.NoError(t, s.vote(voterName, outcome.vote))
+
+ if outcome.drops {
+ ctx, dropVoter := context.WithCancel(ctx)
+ dropVoter()
+
+ // Run the dropping nodes's collectVotes in sync just to ensure
+ // we get the correct error back. If we ran all of the collectVotes
+ // async, the agreeing nodes could finish the transaction and
+ // we would not get a context.Canceled when the vote is successfully
+ // canceled.
+ collectVotes(ctx)
+ continue
+ }
+
+ go collectVotes(ctx)
+ }
+
+ for i, outcome := range tc.outcomes {
+ voterName := voters[i].Name
+ assert.Equal(t, outcome.result, s.state()[voterName], "Node: %q", voterName)
+
+ err := <-results[i]
+ if outcome.errorMessage != "" {
+ assert.EqualError(t, err, outcome.errorMessage)
+ continue
+ }
+
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
func TestSubtransaction_race(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()