diff options
author | Justin Tobler <jtobler@gitlab.com> | 2022-10-06 20:26:38 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2022-10-13 02:01:34 +0300 |
commit | 9d8aa4b2197c70809a0f817983643c79a138f05c (patch) | |
tree | 6cfbf5c75f1749251a5993531139fe223d618870 | |
parent | fbac95b8a6a00a9a5b3b173ac9033c98c50317f5 (diff) |
Praefect: Get pending node subtransactions
Currently the logic to retrieve the pending subtransaction in a
transaction for a node is embedded in `getOrCreateSubtransaction()`.
With future changes to node RPC error handling this same functionality
will also be needed to cancel a voter associated with a failed node.
This change separates this logic into its own function so it can be
reused.
-rw-r--r-- | internal/praefect/transactions/transaction.go | 42 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction_test.go | 121 |
2 files changed, 150 insertions, 13 deletions
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index a1dd1f75a..37279c561 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -227,7 +227,31 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e return nil, errors.New("invalid transaction state") } - for _, subtransaction := range t.subtransactions { + // Check for pending subtransactions on the specified node. + if subtransactions, err := t.getPendingNodeSubtransactions(node); err != nil { + return nil, err + } else if len(subtransactions) != 0 { + // First pending subtransaction is the next in queue for processing. + return subtransactions[0], nil + } + + // If we arrive here, then we know that all the node has voted and + // reached quorum on all subtransactions. We can thus create a new one. + subtransaction, err := newSubtransaction(t.voters, t.threshold) + if err != nil { + return nil, err + } + + t.subtransactions = append(t.subtransactions, subtransaction) + + return subtransaction, nil +} + +// getPendingNodeSubtransactions returns all undecided subtransactions +// for the specified voter. `nil` is returned if there are no pending +// subtransactions for the node. +func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransaction, error) { + for i, subtransaction := range t.subtransactions { result, err := subtransaction.getResult(node) if err != nil { return nil, err @@ -235,8 +259,9 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e switch result { case VoteUndecided: - // An undecided vote means we should vote on this one. - return subtransaction, nil + // Nodes after first undecided voter will also be undecided. + // Remaining subtransactions are returned. + return t.subtransactions[i:], nil case VoteCommitted: // If we have committed this subtransaction, we're good // to go. @@ -258,16 +283,7 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e } } - // If we arrive here, then we know that all the node has voted and - // reached quorum on all subtransactions. We can thus create a new one. - subtransaction, err := newSubtransaction(t.voters, t.threshold) - if err != nil { - return nil, err - } - - t.subtransactions = append(t.subtransactions, subtransaction) - - return subtransaction, nil + return nil, nil } func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error { diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go index b3648ed62..616b3d945 100644 --- a/internal/praefect/transactions/transaction_test.go +++ b/internal/praefect/transactions/transaction_test.go @@ -51,3 +51,124 @@ func TestTransaction_DidVote(t *testing.T) { require.True(t, tx.DidVote("v1")) require.True(t, tx.DidVote("v2")) } + +func TestTransaction_getPendingNodeSubtransactions(t *testing.T) { + t.Parallel() + + var id uint64 + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + sub1, err := newSubtransaction(voters, threshold) + require.NoError(t, err) + sub2, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCommitted}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + sub3, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1, result: VoteFailed}, + {Name: "3", Votes: 1, result: VoteStopped}, + }, + threshold, + ) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + subs []*subtransaction + node string + expSubs []*subtransaction + expErrMsg string + }{ + { + desc: "No subtransactions", + subs: []*subtransaction{}, + node: "1", + expSubs: nil, + expErrMsg: "", + }, + { + desc: "Single pending transaction", + subs: []*subtransaction{sub1}, + node: "1", + expSubs: []*subtransaction{sub1}, + expErrMsg: "", + }, + { + desc: "Single complete transaction", + subs: []*subtransaction{sub2}, + node: "1", + expSubs: nil, + expErrMsg: "", + }, + { + desc: "Two pending transactions", + subs: []*subtransaction{sub1, sub1}, + node: "1", + expSubs: []*subtransaction{sub1, sub1}, + expErrMsg: "", + }, + { + desc: "Two transactions, one pending", + subs: []*subtransaction{sub2, sub1}, + node: "1", + expSubs: []*subtransaction{sub1}, + expErrMsg: "", + }, + { + desc: "Missing node voter", + subs: []*subtransaction{sub1}, + node: "4", + expSubs: nil, + expErrMsg: "invalid node for transaction: \"4\"", + }, + { + desc: "Canceled node voter", + subs: []*subtransaction{sub3}, + node: "1", + expSubs: nil, + expErrMsg: "transaction has been canceled", + }, + { + desc: "Failed node voter", + subs: []*subtransaction{sub3}, + node: "2", + expSubs: nil, + expErrMsg: "transaction did not reach quorum", + }, + { + desc: "Stopped node voter", + subs: []*subtransaction{sub3}, + node: "3", + expSubs: nil, + expErrMsg: "transaction has been stopped", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transaction, err := newTransaction(id, voters, threshold) + require.NoError(t, err) + + transaction.subtransactions = tc.subs + + subtransactions, err := transaction.getPendingNodeSubtransactions(tc.node) + if tc.expErrMsg != "" { + require.EqualError(t, err, tc.expErrMsg) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expSubs, subtransactions) + }) + } +} |