diff options
author | Justin Tobler <jtobler@gitlab.com> | 2022-10-06 20:42:44 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2022-10-20 20:08:38 +0300 |
commit | 586cd751f65d5e5b1297a2018244d113cd3fede4 (patch) | |
tree | c7a6f81b07701694e8c38eaf2ffb13ab827c434f | |
parent | af239d6ed4f6d0de0da920c0b75d950978a844d4 (diff) |
Praefect: Cancel transaction node voter
Currently there is no means to cancel a voter and recheck for quroum in
a pending transaction. This change adds `CancelTransactionNodeVoter()`
to the `transactions.Manager` API as a means to interact with a
transaction and cancel the voter associated with the specified node.
-rw-r--r-- | internal/praefect/transactions/manager.go | 19 | ||||
-rw-r--r-- | internal/praefect/transactions/manager_test.go | 65 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 24 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction_test.go | 95 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 36 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction_test.go | 107 |
6 files changed, 346 insertions, 0 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 57a81d7db..9231dbd19 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -230,3 +230,22 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e return nil } + +// CancelTransactionNodeVoter cancels the voter associated with the specified transaction +// and node. Voters are canceled when the node RPC fails and its votes can no longer count +// towards quorum. +func (mgr *Manager) CancelTransactionNodeVoter(transactionID uint64, node string) error { + mgr.lock.Lock() + transaction, ok := mgr.transactions[transactionID] + mgr.lock.Unlock() + + if !ok { + return fmt.Errorf("%w: %d", ErrNotFound, transactionID) + } + + if err := transaction.cancelNodeVoter(node); err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go new file mode 100644 index 000000000..305257373 --- /dev/null +++ b/internal/praefect/transactions/manager_test.go @@ -0,0 +1,65 @@ +package transactions + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestManager_CancelTransactionNodeVoter(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + for _, tc := range []struct { + desc string + register bool + node string + expErrMsg string + }{ + { + desc: "No transaction exists", + register: false, + node: "1", + expErrMsg: "transaction not found: 0", + }, + { + desc: "Transaction exists", + register: true, + node: "1", + expErrMsg: "", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + manager := NewManager(config.Config{}) + + var id uint64 + if tc.register { + transaction, cleanup, err := manager.RegisterTransaction(ctx, voters, threshold) + defer func() { + err := cleanup() + require.NoError(t, err) + }() + require.NoError(t, err) + + id = transaction.ID() + } + + err := manager.CancelTransactionNodeVoter(id, "1") + if tc.expErrMsg != "" { + require.Error(t, err) + require.Equal(t, tc.expErrMsg, err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 92d370ea9..19a43b667 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -380,3 +380,27 @@ func (t *subtransaction) getVote(node string) (*voting.Vote, error) { vote := *voter.vote return &vote, nil } + +// cancelNodeVoter updates a node's associated voter state to `VoteCanceled`. +// All must voters wait until either quorum has been achieved or quorum +// becomes impossible. A canceled voter's votes are not counted as a part of +// the total outstanding votes which can cause a subtransaction to not have +// enough votes to reach the required threshold. If this happens the vote +// will be considered failed and the voters unblocked. +func (t *subtransaction) cancelNodeVoter(node string) error { + t.lock.Lock() + defer t.lock.Unlock() + + voter, ok := t.votersByNode[node] + if !ok { + return fmt.Errorf("invalid node for subtransaction: %q", node) + } + + // Updating voter state with a nil vote will result in the voter + // getting canceled. + if err := t.updateVoterState(voter, nil); err != nil { + return fmt.Errorf("cancel vote: %w", err) + } + + return nil +} diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go index 5ba4e8476..24b35cc54 100644 --- a/internal/praefect/transactions/subtransaction_test.go +++ b/internal/praefect/transactions/subtransaction_test.go @@ -806,6 +806,101 @@ func TestSubtransaction_quorumCheck(t *testing.T) { } } +func TestSubtransaction_cancelNodeVoter(t *testing.T) { + voteA := newVote(t, "a") + voteB := newVote(t, "b") + + threshold := uint(2) + + for _, tc := range []struct { + desc string + voters []Voter + node string + result VoteResult + subDone bool + expErrMsg string + }{ + { + desc: "Cancel undecided voter", + voters: []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "1", + result: VoteCanceled, + subDone: false, + expErrMsg: "", + }, + { + desc: "Cancel canceled voter", + voters: []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "1", + result: VoteCanceled, + subDone: false, + expErrMsg: "cancel vote: transaction has been canceled", + }, + { + desc: "Cancel nonexistent voter", + voters: []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "4", + result: VoteCanceled, + subDone: false, + expErrMsg: "invalid node for subtransaction: \"4\"", + }, + { + desc: "Cancel last voter", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA}, + {Name: "2", Votes: 1, vote: &voteB}, + {Name: "3", Votes: 1}, + }, + node: "3", + result: VoteCanceled, + subDone: true, + expErrMsg: "", + }, + { + desc: "Cancel voter making quorum impossible", + voters: []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "2", + result: VoteCanceled, + subDone: true, + expErrMsg: "", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + subtransaction, err := newSubtransaction(tc.voters, threshold) + require.NoError(t, err) + + if err := subtransaction.cancelNodeVoter(tc.node); tc.expErrMsg != "" { + require.EqualError(t, err, tc.expErrMsg) + } else { + require.NoError(t, err) + } + + voter, ok := subtransaction.votersByNode[tc.node] + if ok { + require.Equal(t, tc.result, voter.result) + } + + require.Equal(t, tc.subDone, subtransaction.isDone()) + }) + } +} + func newVote(t *testing.T, s string) voting.Vote { hash := sha1.Sum([]byte(s)) vote, err := voting.VoteFromHash(hash[:]) diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 02325ed35..fed090e81 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -339,3 +339,39 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e return subtransaction.collectVotes(ctx, node) } + +// cancelNodeVoter cancels the undecided voters associated with +// the specified node for all pending subtransactions. +func (t *transaction) cancelNodeVoter(node string) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Get all undecided subtransactions for node. + pendingSubtransactions, err := t.getPendingNodeSubtransactions(node) + if err != nil { + return err + } + + // If there are no pending subtransactions a new one should + // be created and added to the transaction so the failure + // can be tracked. + if len(pendingSubtransactions) == 0 { + sub, err := t.createSubtransaction() + if err != nil { + return err + } + + t.subtransactions = append(t.subtransactions, sub) + pendingSubtransactions = []*subtransaction{sub} + } + + // Cancel node voters in undecided subtransactions. + for _, subtransaction := range pendingSubtransactions { + err := subtransaction.cancelNodeVoter(node) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go index 819d36a66..8d86c3d98 100644 --- a/internal/praefect/transactions/transaction_test.go +++ b/internal/praefect/transactions/transaction_test.go @@ -276,3 +276,110 @@ func TestTransaction_createSubtransaction(t *testing.T) { }) } } + +func TestTransaction_cancelNodeVoter(t *testing.T) { + t.Parallel() + + var id uint64 + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + committedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCommitted}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + undecidedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteUndecided}, + {Name: "3", Votes: 1, result: VoteUndecided}, + }, + threshold, + ) + require.NoError(t, err) + decidedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + cancelingSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteUndecided}, + {Name: "3", Votes: 1, result: VoteCanceled}, + }, + threshold, + ) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + subs []*subtransaction + node string + expErrMsg string + }{ + { + desc: "No subtransactions", + subs: nil, + node: "1", + expErrMsg: "", + }, + { + desc: "No pending subtransactions", + subs: []*subtransaction{committedSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "One Pending subtransaction", + subs: []*subtransaction{undecidedSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "Two Pending subtransactions", + subs: []*subtransaction{decidedSubtransaction, cancelingSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "Invalid node", + subs: []*subtransaction{committedSubtransaction}, + node: "4", + expErrMsg: "invalid node for transaction: \"4\"", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transaction, err := newTransaction(id, voters, threshold) + require.NoError(t, err) + + transaction.subtransactions = tc.subs + + err = transaction.cancelNodeVoter(tc.node) + if tc.expErrMsg != "" { + require.Error(t, err) + require.Equal(t, tc.expErrMsg, err.Error()) + } else { + require.NoError(t, err) + + // Check the last subtransaction to make sure cancel propagation occurs. + sub := transaction.subtransactions[len(transaction.subtransactions)-1] + voter := sub.votersByNode[tc.node] + require.Equal(t, VoteCanceled, voter.result) + } + }) + } +} |