Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2022-10-06 20:26:38 +0300
committerJustin Tobler <jtobler@gitlab.com>2022-10-13 02:01:34 +0300
commit9d8aa4b2197c70809a0f817983643c79a138f05c (patch)
tree6cfbf5c75f1749251a5993531139fe223d618870
parentfbac95b8a6a00a9a5b3b173ac9033c98c50317f5 (diff)
Praefect: Get pending node subtransactions
Currently the logic to retrieve the pending subtransaction in a transaction for a node is embedded in `getOrCreateSubtransaction()`. With future changes to node RPC error handling this same functionality will also be needed to cancel a voter associated with a failed node. This change separates this logic into its own function so it can be reused.
-rw-r--r--internal/praefect/transactions/transaction.go42
-rw-r--r--internal/praefect/transactions/transaction_test.go121
2 files changed, 150 insertions, 13 deletions
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index a1dd1f75a..37279c561 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -227,7 +227,31 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
return nil, errors.New("invalid transaction state")
}
- for _, subtransaction := range t.subtransactions {
+ // Check for pending subtransactions on the specified node.
+ if subtransactions, err := t.getPendingNodeSubtransactions(node); err != nil {
+ return nil, err
+ } else if len(subtransactions) != 0 {
+ // First pending subtransaction is the next in queue for processing.
+ return subtransactions[0], nil
+ }
+
+ // If we arrive here, then we know that all the node has voted and
+ // reached quorum on all subtransactions. We can thus create a new one.
+ subtransaction, err := newSubtransaction(t.voters, t.threshold)
+ if err != nil {
+ return nil, err
+ }
+
+ t.subtransactions = append(t.subtransactions, subtransaction)
+
+ return subtransaction, nil
+}
+
+// getPendingNodeSubtransactions returns all undecided subtransactions
+// for the specified voter. `nil` is returned if there are no pending
+// subtransactions for the node.
+func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransaction, error) {
+ for i, subtransaction := range t.subtransactions {
result, err := subtransaction.getResult(node)
if err != nil {
return nil, err
@@ -235,8 +259,9 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
switch result {
case VoteUndecided:
- // An undecided vote means we should vote on this one.
- return subtransaction, nil
+ // Nodes after first undecided voter will also be undecided.
+ // Remaining subtransactions are returned.
+ return t.subtransactions[i:], nil
case VoteCommitted:
// If we have committed this subtransaction, we're good
// to go.
@@ -258,16 +283,7 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
}
}
- // If we arrive here, then we know that all the node has voted and
- // reached quorum on all subtransactions. We can thus create a new one.
- subtransaction, err := newSubtransaction(t.voters, t.threshold)
- if err != nil {
- return nil, err
- }
-
- t.subtransactions = append(t.subtransactions, subtransaction)
-
- return subtransaction, nil
+ return nil, nil
}
func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error {
diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go
index b3648ed62..616b3d945 100644
--- a/internal/praefect/transactions/transaction_test.go
+++ b/internal/praefect/transactions/transaction_test.go
@@ -51,3 +51,124 @@ func TestTransaction_DidVote(t *testing.T) {
require.True(t, tx.DidVote("v1"))
require.True(t, tx.DidVote("v2"))
}
+
+func TestTransaction_getPendingNodeSubtransactions(t *testing.T) {
+ t.Parallel()
+
+ var id uint64
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ sub1, err := newSubtransaction(voters, threshold)
+ require.NoError(t, err)
+ sub2, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCommitted},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ sub3, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteFailed},
+ {Name: "3", Votes: 1, result: VoteStopped},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ subs []*subtransaction
+ node string
+ expSubs []*subtransaction
+ expErrMsg string
+ }{
+ {
+ desc: "No subtransactions",
+ subs: []*subtransaction{},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "",
+ },
+ {
+ desc: "Single pending transaction",
+ subs: []*subtransaction{sub1},
+ node: "1",
+ expSubs: []*subtransaction{sub1},
+ expErrMsg: "",
+ },
+ {
+ desc: "Single complete transaction",
+ subs: []*subtransaction{sub2},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "",
+ },
+ {
+ desc: "Two pending transactions",
+ subs: []*subtransaction{sub1, sub1},
+ node: "1",
+ expSubs: []*subtransaction{sub1, sub1},
+ expErrMsg: "",
+ },
+ {
+ desc: "Two transactions, one pending",
+ subs: []*subtransaction{sub2, sub1},
+ node: "1",
+ expSubs: []*subtransaction{sub1},
+ expErrMsg: "",
+ },
+ {
+ desc: "Missing node voter",
+ subs: []*subtransaction{sub1},
+ node: "4",
+ expSubs: nil,
+ expErrMsg: "invalid node for transaction: \"4\"",
+ },
+ {
+ desc: "Canceled node voter",
+ subs: []*subtransaction{sub3},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "transaction has been canceled",
+ },
+ {
+ desc: "Failed node voter",
+ subs: []*subtransaction{sub3},
+ node: "2",
+ expSubs: nil,
+ expErrMsg: "transaction did not reach quorum",
+ },
+ {
+ desc: "Stopped node voter",
+ subs: []*subtransaction{sub3},
+ node: "3",
+ expSubs: nil,
+ expErrMsg: "transaction has been stopped",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transaction, err := newTransaction(id, voters, threshold)
+ require.NoError(t, err)
+
+ transaction.subtransactions = tc.subs
+
+ subtransactions, err := transaction.getPendingNodeSubtransactions(tc.node)
+ if tc.expErrMsg != "" {
+ require.EqualError(t, err, tc.expErrMsg)
+ } else {
+ require.NoError(t, err)
+ }
+
+ require.Equal(t, tc.expSubs, subtransactions)
+ })
+ }
+}