Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2022-10-06 20:42:44 +0300
committerJustin Tobler <jtobler@gitlab.com>2022-10-20 20:08:38 +0300
commit586cd751f65d5e5b1297a2018244d113cd3fede4 (patch)
treec7a6f81b07701694e8c38eaf2ffb13ab827c434f
parentaf239d6ed4f6d0de0da920c0b75d950978a844d4 (diff)
Praefect: Cancel transaction node voter
Currently there is no means to cancel a voter and recheck for quroum in a pending transaction. This change adds `CancelTransactionNodeVoter()` to the `transactions.Manager` API as a means to interact with a transaction and cancel the voter associated with the specified node.
-rw-r--r--internal/praefect/transactions/manager.go19
-rw-r--r--internal/praefect/transactions/manager_test.go65
-rw-r--r--internal/praefect/transactions/subtransaction.go24
-rw-r--r--internal/praefect/transactions/subtransaction_test.go95
-rw-r--r--internal/praefect/transactions/transaction.go36
-rw-r--r--internal/praefect/transactions/transaction_test.go107
6 files changed, 346 insertions, 0 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 57a81d7db..9231dbd19 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -230,3 +230,22 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e
return nil
}
+
+// CancelTransactionNodeVoter cancels the voter associated with the specified transaction
+// and node. Voters are canceled when the node RPC fails and its votes can no longer count
+// towards quorum.
+func (mgr *Manager) CancelTransactionNodeVoter(transactionID uint64, node string) error {
+ mgr.lock.Lock()
+ transaction, ok := mgr.transactions[transactionID]
+ mgr.lock.Unlock()
+
+ if !ok {
+ return fmt.Errorf("%w: %d", ErrNotFound, transactionID)
+ }
+
+ if err := transaction.cancelNodeVoter(node); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go
new file mode 100644
index 000000000..305257373
--- /dev/null
+++ b/internal/praefect/transactions/manager_test.go
@@ -0,0 +1,65 @@
+package transactions
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func TestManager_CancelTransactionNodeVoter(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ register bool
+ node string
+ expErrMsg string
+ }{
+ {
+ desc: "No transaction exists",
+ register: false,
+ node: "1",
+ expErrMsg: "transaction not found: 0",
+ },
+ {
+ desc: "Transaction exists",
+ register: true,
+ node: "1",
+ expErrMsg: "",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ manager := NewManager(config.Config{})
+
+ var id uint64
+ if tc.register {
+ transaction, cleanup, err := manager.RegisterTransaction(ctx, voters, threshold)
+ defer func() {
+ err := cleanup()
+ require.NoError(t, err)
+ }()
+ require.NoError(t, err)
+
+ id = transaction.ID()
+ }
+
+ err := manager.CancelTransactionNodeVoter(id, "1")
+ if tc.expErrMsg != "" {
+ require.Error(t, err)
+ require.Equal(t, tc.expErrMsg, err.Error())
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
+}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 92d370ea9..19a43b667 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -380,3 +380,27 @@ func (t *subtransaction) getVote(node string) (*voting.Vote, error) {
vote := *voter.vote
return &vote, nil
}
+
+// cancelNodeVoter updates a node's associated voter state to `VoteCanceled`.
+// All must voters wait until either quorum has been achieved or quorum
+// becomes impossible. A canceled voter's votes are not counted as a part of
+// the total outstanding votes which can cause a subtransaction to not have
+// enough votes to reach the required threshold. If this happens the vote
+// will be considered failed and the voters unblocked.
+func (t *subtransaction) cancelNodeVoter(node string) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return fmt.Errorf("invalid node for subtransaction: %q", node)
+ }
+
+ // Updating voter state with a nil vote will result in the voter
+ // getting canceled.
+ if err := t.updateVoterState(voter, nil); err != nil {
+ return fmt.Errorf("cancel vote: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go
index 5ba4e8476..24b35cc54 100644
--- a/internal/praefect/transactions/subtransaction_test.go
+++ b/internal/praefect/transactions/subtransaction_test.go
@@ -806,6 +806,101 @@ func TestSubtransaction_quorumCheck(t *testing.T) {
}
}
+func TestSubtransaction_cancelNodeVoter(t *testing.T) {
+ voteA := newVote(t, "a")
+ voteB := newVote(t, "b")
+
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ voters []Voter
+ node string
+ result VoteResult
+ subDone bool
+ expErrMsg string
+ }{
+ {
+ desc: "Cancel undecided voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "1",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel canceled voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "1",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "cancel vote: transaction has been canceled",
+ },
+ {
+ desc: "Cancel nonexistent voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "4",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "invalid node for subtransaction: \"4\"",
+ },
+ {
+ desc: "Cancel last voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA},
+ {Name: "2", Votes: 1, vote: &voteB},
+ {Name: "3", Votes: 1},
+ },
+ node: "3",
+ result: VoteCanceled,
+ subDone: true,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel voter making quorum impossible",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "2",
+ result: VoteCanceled,
+ subDone: true,
+ expErrMsg: "",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ subtransaction, err := newSubtransaction(tc.voters, threshold)
+ require.NoError(t, err)
+
+ if err := subtransaction.cancelNodeVoter(tc.node); tc.expErrMsg != "" {
+ require.EqualError(t, err, tc.expErrMsg)
+ } else {
+ require.NoError(t, err)
+ }
+
+ voter, ok := subtransaction.votersByNode[tc.node]
+ if ok {
+ require.Equal(t, tc.result, voter.result)
+ }
+
+ require.Equal(t, tc.subDone, subtransaction.isDone())
+ })
+ }
+}
+
func newVote(t *testing.T, s string) voting.Vote {
hash := sha1.Sum([]byte(s))
vote, err := voting.VoteFromHash(hash[:])
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 02325ed35..fed090e81 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -339,3 +339,39 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e
return subtransaction.collectVotes(ctx, node)
}
+
+// cancelNodeVoter cancels the undecided voters associated with
+// the specified node for all pending subtransactions.
+func (t *transaction) cancelNodeVoter(node string) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // Get all undecided subtransactions for node.
+ pendingSubtransactions, err := t.getPendingNodeSubtransactions(node)
+ if err != nil {
+ return err
+ }
+
+ // If there are no pending subtransactions a new one should
+ // be created and added to the transaction so the failure
+ // can be tracked.
+ if len(pendingSubtransactions) == 0 {
+ sub, err := t.createSubtransaction()
+ if err != nil {
+ return err
+ }
+
+ t.subtransactions = append(t.subtransactions, sub)
+ pendingSubtransactions = []*subtransaction{sub}
+ }
+
+ // Cancel node voters in undecided subtransactions.
+ for _, subtransaction := range pendingSubtransactions {
+ err := subtransaction.cancelNodeVoter(node)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go
index 819d36a66..8d86c3d98 100644
--- a/internal/praefect/transactions/transaction_test.go
+++ b/internal/praefect/transactions/transaction_test.go
@@ -276,3 +276,110 @@ func TestTransaction_createSubtransaction(t *testing.T) {
})
}
}
+
+func TestTransaction_cancelNodeVoter(t *testing.T) {
+ t.Parallel()
+
+ var id uint64
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ committedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCommitted},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ undecidedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ decidedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ cancelingSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteCanceled},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ subs []*subtransaction
+ node string
+ expErrMsg string
+ }{
+ {
+ desc: "No subtransactions",
+ subs: nil,
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "No pending subtransactions",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "One Pending subtransaction",
+ subs: []*subtransaction{undecidedSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "Two Pending subtransactions",
+ subs: []*subtransaction{decidedSubtransaction, cancelingSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "Invalid node",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "4",
+ expErrMsg: "invalid node for transaction: \"4\"",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transaction, err := newTransaction(id, voters, threshold)
+ require.NoError(t, err)
+
+ transaction.subtransactions = tc.subs
+
+ err = transaction.cancelNodeVoter(tc.node)
+ if tc.expErrMsg != "" {
+ require.Error(t, err)
+ require.Equal(t, tc.expErrMsg, err.Error())
+ } else {
+ require.NoError(t, err)
+
+ // Check the last subtransaction to make sure cancel propagation occurs.
+ sub := transaction.subtransactions[len(transaction.subtransactions)-1]
+ voter := sub.votersByNode[tc.node]
+ require.Equal(t, VoteCanceled, voter.result)
+ }
+ })
+ }
+}