Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-10-24 08:55:44 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-10-24 08:55:44 +0300
commit9100bc1ba991757588906a818e24d52932ba665c (patch)
treeb018a47a7815fd459cbeb92ad22bebc764695186
parentf8914b6e0da033ee251ca2f79384043d25a01ec6 (diff)
parente7a07dca63658bc5fd15399522c7a359d002b413 (diff)
Merge branch 'jt-praefect-transaction-error-handler' into 'master'
Praefect: Update voter state on failed node RPC Closes #4088 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/4880 Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r--internal/metadata/featureflag/ff_node_error_cancels_voter.go11
-rw-r--r--internal/praefect/coordinator.go17
-rw-r--r--internal/praefect/coordinator_test.go113
-rw-r--r--internal/praefect/transactions/manager.go19
-rw-r--r--internal/praefect/transactions/manager_test.go65
-rw-r--r--internal/praefect/transactions/subtransaction.go129
-rw-r--r--internal/praefect/transactions/subtransaction_test.go316
-rw-r--r--internal/praefect/transactions/transaction.go113
-rw-r--r--internal/praefect/transactions/transaction_test.go332
-rw-r--r--internal/testhelper/testhelper.go3
10 files changed, 1056 insertions, 62 deletions
diff --git a/internal/metadata/featureflag/ff_node_error_cancels_voter.go b/internal/metadata/featureflag/ff_node_error_cancels_voter.go
new file mode 100644
index 000000000..797c7c861
--- /dev/null
+++ b/internal/metadata/featureflag/ff_node_error_cancels_voter.go
@@ -0,0 +1,11 @@
+package featureflag
+
+// NodeErrorCancelsVoter enables cancellation of the voter associated
+// with a failed node RPC. By canceling voters that can no longer vote,
+// the transaction can fail faster if quorum becomes impossible.
+var NodeErrorCancelsVoter = NewFeatureFlag(
+ "node_error_cancels_voter",
+ "v15.6.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4552",
+ false,
+)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index e000c24cf..e16591223 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -467,10 +467,19 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
ctxlogrus.Extract(ctx).WithError(err).
Error("proxying to secondary failed")
- // For now, any errors returned by secondaries are ignored.
- // This is mostly so that we do not abort transactions which
- // are ongoing and may succeed even with a subset of
- // secondaries bailing out.
+ if featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) {
+ // Cancels failed node's voter in its current subtransaction.
+ // Also updates internal state of subtransaction to fail and
+ // release blocked voters if quorum becomes impossible.
+ if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil {
+ ctxlogrus.Extract(ctx).WithError(err).
+ Error("canceling secondary voter failed")
+ }
+ }
+
+ // The error is ignored, so we do not abort transactions
+ // which are ongoing and may succeed even with a subset
+ // of secondaries bailing out.
return nil
},
})
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index fb9c9ff99..8b340e5a0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -395,6 +395,119 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
require.NoError(t, err)
}
+func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.NodeErrorCancelsVoter).Run(t, testStreamDirectorMutatorSecondaryErrorHandling)
+}
+
+func testStreamDirectorMutatorSecondaryErrorHandling(t *testing.T, ctx context.Context) {
+ ctx, ctxCancel := context.WithCancel(ctx)
+
+ socket := testhelper.GetTemporaryGitalySocketFileName(t)
+ testhelper.NewServerWithHealth(t, socket)
+
+ primaryNode := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-1"}
+ secondaryNode1 := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-2"}
+ secondaryNode2 := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-3"}
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{primaryNode, secondaryNode1, secondaryNode2},
+ },
+ },
+ }
+
+ repo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ for _, name := range []string{"praefect-internal-1", "praefect-internal-2", "praefect-internal-3"} {
+ node, err := shard.GetNode(name)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(t, ctx, node, true)
+ }
+
+ rs := datastore.MockRepositoryStore{
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{"praefect-internal-1": {}, "praefect-internal-2": {}, "praefect-internal-3": {}}, nil
+ },
+ }
+
+ txMgr := transactions.NewManager(conf)
+
+ coordinator := NewCoordinator(
+ datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
+
+ frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{
+ Repository: &repo,
+ })
+ require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+
+ txCtx := peer.NewContext(streamParams.Primary().Ctx, &peer.Peer{})
+ transaction, err := txinfo.TransactionFromContext(txCtx)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ vote := voting.VoteFromData([]byte("vote"))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, "praefect-internal-1", vote)
+ if featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) {
+ require.ErrorIs(t, err, transactions.ErrTransactionFailed)
+ } else {
+ require.EqualError(t, err, "context canceled")
+ }
+ }()
+
+ for _, secondary := range streamParams.Secondaries() {
+ wg.Add(1)
+ secondary := secondary
+ go func() {
+ err := secondary.ErrHandler(errors.New("node RPC failure"))
+ require.NoError(t, err)
+
+ wg.Done()
+ }()
+ }
+
+ // If the feature flag is not enabled the context must be canceled to unblock voters.
+ if !featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) {
+ ctxCancel()
+ }
+
+ wg.Wait()
+
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
+ ctxCancel()
+}
+
func TestStreamDirector_maintenance(t *testing.T) {
t.Parallel()
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 57a81d7db..9231dbd19 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -230,3 +230,22 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e
return nil
}
+
+// CancelTransactionNodeVoter cancels the voter associated with the specified transaction
+// and node. Voters are canceled when the node RPC fails and its votes can no longer count
+// towards quorum.
+func (mgr *Manager) CancelTransactionNodeVoter(transactionID uint64, node string) error {
+ mgr.lock.Lock()
+ transaction, ok := mgr.transactions[transactionID]
+ mgr.lock.Unlock()
+
+ if !ok {
+ return fmt.Errorf("%w: %d", ErrNotFound, transactionID)
+ }
+
+ if err := transaction.cancelNodeVoter(node); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go
new file mode 100644
index 000000000..305257373
--- /dev/null
+++ b/internal/praefect/transactions/manager_test.go
@@ -0,0 +1,65 @@
+package transactions
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func TestManager_CancelTransactionNodeVoter(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ register bool
+ node string
+ expErrMsg string
+ }{
+ {
+ desc: "No transaction exists",
+ register: false,
+ node: "1",
+ expErrMsg: "transaction not found: 0",
+ },
+ {
+ desc: "Transaction exists",
+ register: true,
+ node: "1",
+ expErrMsg: "",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ manager := NewManager(config.Config{})
+
+ var id uint64
+ if tc.register {
+ transaction, cleanup, err := manager.RegisterTransaction(ctx, voters, threshold)
+ defer func() {
+ err := cleanup()
+ require.NoError(t, err)
+ }()
+ require.NoError(t, err)
+
+ id = transaction.ID()
+ }
+
+ err := manager.CancelTransactionNodeVoter(id, "1")
+ if tc.expErrMsg != "" {
+ require.Error(t, err)
+ require.Equal(t, tc.expErrMsg, err.Error())
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
+}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index ab21107b8..19a43b667 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -79,7 +79,9 @@ func (t *subtransaction) stop() error {
switch voter.result {
case VoteCanceled:
// If the vote was canceled already, we cannot stop it.
- return ErrTransactionCanceled
+ // A single node voter being canceled is not indicative
+ // of all voter's state. Other voters must be checked.
+ continue
case VoteStopped:
// Similar if the vote was stopped already.
return ErrTransactionStopped
@@ -136,9 +138,9 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error {
return nil
}
-// updateVoterStates updates undecided voters or cancels existing votes of decided voters if given a
-// `nil` vote. Voters are updated either as soon as quorum was reached or alternatively when all
-// votes were cast.
+// updateVoterState updates undecided voters or cancels existing votes if given a `nil`
+// vote. Voters are updated either as soon as quorum was reached or alternatively when
+// quorum becomes impossible.
func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error {
switch voter.result {
case VoteUndecided:
@@ -171,10 +173,20 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error
return errors.New("subtransaction was already finished")
}
+ // A voter's result can only be canceled if the subtransaction is still pending.
+ // If a change has already been committed to disk the voter result cannot be
+ // changed since the subtransction is considered complete.
+ voter.result = VoteCanceled
+
// Remove the voter's support for the vote so it's not counted towards the
// majority. The node is not going to commit the subtransaction anyway.
- t.voteCounts[*voter.vote] -= voter.Votes
- voter.result = VoteCanceled
+ if voter.vote != nil {
+ t.voteCounts[*voter.vote] -= voter.Votes
+ }
+
+ // A canceled voter can no longer voter so its vote is
+ // reset after being subtracted from the vote counts.
+ voter.vote = nil
}
defer func() {
@@ -183,26 +195,12 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error
}
}()
- var majorityVote *voting.Vote
- var majorityVoteCount uint
- for v, voteCount := range t.voteCounts {
- if majorityVoteCount < voteCount {
- v := v
- majorityVoteCount = voteCount
- majorityVote = &v
- }
- }
-
- var outstandingVotes uint
- for _, voter := range t.votersByNode {
- if voter.vote == nil {
- outstandingVotes += voter.Votes
- }
- }
-
- // When the majority vote didn't yet cross the threshold and the number of outstanding votes
- // may still get us across that threshold, then we need to wait for more votes to come in.
- if majorityVoteCount < t.threshold && majorityVoteCount+outstandingVotes >= t.threshold {
+ // Check if quorum has been reached or if quorum is still attainable for the subtransaction.
+ // If quorum has not been achieved the vote returned by the quorum check will be `nil`.
+ // As long as quorum has not been achieved and is still possible, the subtransaction
+ // will wait for additional voter's results to come in.
+ majorityVote, quorumPossible := t.quorumCheck()
+ if majorityVote == nil && quorumPossible {
return nil
}
@@ -223,15 +221,15 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error
continue
}
- // If the majority vote count is smaller than the threshold at this point, then we
- // know that we cannot ever reach it anymore even with the votes which are still
- // outstanding. We can thus mark this node as failed.
- if majorityVoteCount < t.threshold {
+ // If quorum is not possible we know there are not enough outstanding votes to cross
+ // the threshold required by the subtransaction. We can thus mark this node as failed.
+ if !quorumPossible {
voter.result = VoteFailed
continue
}
- // Otherwise, the result depends on whether the voter agrees on the quorum or not.
+ // At this point we know quorum has been achieved and a majority vote is present.
+ // A check is done to see if the voter agrees with the quorum majority vote.
if *voter.vote == *majorityVote {
voter.result = VoteCommitted
} else {
@@ -252,26 +250,45 @@ func (t *subtransaction) mustSignalVoters() bool {
return false
}
- // Check if any node has reached the threshold. If it did, then we need
- // to signal voters.
- for _, voteCount := range t.voteCounts {
- if voteCount >= t.threshold {
- return true
+ majorityVote, quorumPossible := t.quorumCheck()
+
+ // If there is majority vote threshold has been met and voters can be signaled.
+ if majorityVote != nil {
+ return true
+ }
+
+ // If quorum is still possible the voters should not be signaled, since
+ // remaining voters could cause us to reach quorum. If quorum is not
+ // possible voters should be unblocked to allow the transaction to fail.
+ return !quorumPossible
+}
+
+// quorumCheck returns the majority vote if quorum has been achieved
+// and if not `nil` is returned. It also returns whether quorum can
+// still be achieved with the outstanding voters.
+func (t *subtransaction) quorumCheck() (*voting.Vote, bool) {
+ var leader *voting.Vote
+ var majority uint
+ for v, voteCount := range t.voteCounts {
+ if majority < voteCount {
+ v := v
+ majority = voteCount
+ leader = &v
}
}
- // The threshold wasn't reached by any node yet. If there are undecided voters, then we
- // cannot notify yet as any remaining nodes may cause us to reach quorum.
+ if majority >= t.threshold {
+ return leader, true
+ }
+
+ var outstanding uint
for _, voter := range t.votersByNode {
- if voter.result == VoteUndecided {
- return false
+ if voter.vote == nil && voter.result == VoteUndecided {
+ outstanding += voter.Votes
}
}
- // Otherwise we know that all votes are in and that no quorum was
- // reached. We thus need to notify callers of the failed vote as the
- // last node which has cast its vote.
- return true
+ return nil, majority+outstanding >= t.threshold
}
func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
@@ -363,3 +380,27 @@ func (t *subtransaction) getVote(node string) (*voting.Vote, error) {
vote := *voter.vote
return &vote, nil
}
+
+// cancelNodeVoter updates a node's associated voter state to `VoteCanceled`.
+// All must voters wait until either quorum has been achieved or quorum
+// becomes impossible. A canceled voter's votes are not counted as a part of
+// the total outstanding votes which can cause a subtransaction to not have
+// enough votes to reach the required threshold. If this happens the vote
+// will be considered failed and the voters unblocked.
+func (t *subtransaction) cancelNodeVoter(node string) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return fmt.Errorf("invalid node for subtransaction: %q", node)
+ }
+
+ // Updating voter state with a nil vote will result in the voter
+ // getting canceled.
+ if err := t.updateVoterState(voter, nil); err != nil {
+ return fmt.Errorf("cancel vote: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go
index d7b2eee42..24b35cc54 100644
--- a/internal/praefect/transactions/subtransaction_test.go
+++ b/internal/praefect/transactions/subtransaction_test.go
@@ -51,7 +51,7 @@ func TestSubtransaction_stop(t *testing.T) {
require.Equal(t, VoteFailed, s.votersByNode["3"].result)
})
- t.Run("stop of canceled transaction fails", func(t *testing.T) {
+ t.Run("stop of transaction with single canceled voter", func(t *testing.T) {
s, err := newSubtransaction([]Voter{
{Name: "1", Votes: 1, result: VoteUndecided},
{Name: "2", Votes: 1, result: VoteCommitted},
@@ -60,8 +60,8 @@ func TestSubtransaction_stop(t *testing.T) {
}, 1)
require.NoError(t, err)
- require.Equal(t, s.stop(), ErrTransactionCanceled)
- require.False(t, s.isDone())
+ require.NoError(t, s.stop())
+ require.True(t, s.isDone())
})
t.Run("stop of stopped transaction fails", func(t *testing.T) {
@@ -512,6 +512,8 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) {
s, err := newSubtransaction(voters, totalWeight/2+1)
require.NoError(t, err)
+ var deferredCollectVotes []func(ctx context.Context)
+
results := make([]chan error, len(tc.outcomes))
for i, outcome := range tc.outcomes {
voterName := voters[i].Name
@@ -535,7 +537,15 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) {
continue
}
- go collectVotes(ctx)
+ // Since voters are unblocked once quorum becomes impossible voters that
+ // are not dropped must have their call to `collectVotes` deferred so
+ // the result state will not be prone to race conditions.
+ deferredCollectVotes = append(deferredCollectVotes, collectVotes)
+ }
+
+ // With all votes cast the remaining `collectVotes` can be called.
+ for _, collectVotes := range deferredCollectVotes {
+ collectVotes(ctx)
}
for i, outcome := range tc.outcomes {
@@ -593,6 +603,304 @@ func TestSubtransaction_race(t *testing.T) {
}
}
+func TestSubtransaction_updateVoterState(t *testing.T) {
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ vote := newVote(t, "a")
+
+ for _, tc := range []struct {
+ desc string
+ voter *Voter
+ vote *voting.Vote
+ expVote *voting.Vote
+ expVotes uint
+ expResult VoteResult
+ expErrMsg string
+ }{
+ {
+ desc: "Update voter",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: nil, result: VoteUndecided,
+ },
+ vote: &vote,
+ expVote: &vote,
+ expVotes: 1,
+ expResult: VoteUndecided,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel voter that has not voted",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: nil, result: VoteUndecided,
+ },
+ vote: nil,
+ expVote: nil,
+ expVotes: 0,
+ expResult: VoteCanceled,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel voter that has voted",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: &vote, result: VoteUndecided,
+ },
+ vote: nil,
+ expVote: nil,
+ expVotes: 0,
+ expResult: VoteCanceled,
+ expErrMsg: "",
+ },
+ {
+ desc: "Update canceled voter",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: nil, result: VoteCanceled,
+ },
+ vote: nil,
+ expVote: nil,
+ expVotes: 0,
+ expResult: VoteCanceled,
+ expErrMsg: "transaction has been canceled",
+ },
+ {
+ desc: "Update canceled voter",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: nil, result: VoteStopped,
+ },
+ vote: nil,
+ expVote: nil,
+ expVotes: 0,
+ expResult: VoteStopped,
+ expErrMsg: "transaction has been stopped",
+ },
+ {
+ desc: "Update committed voter",
+ voter: &Voter{
+ Name: "1", Votes: 1, vote: nil, result: VoteCommitted,
+ },
+ vote: nil,
+ expVote: nil,
+ expVotes: 0,
+ expResult: VoteCommitted,
+ expErrMsg: "cannot change committed vote",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ subtransaction, err := newSubtransaction(voters, threshold)
+ require.NoError(t, err)
+
+ if tc.voter.vote != nil {
+ subtransaction.voteCounts[*tc.voter.vote] += tc.voter.Votes
+ }
+
+ err = subtransaction.updateVoterState(tc.voter, tc.vote)
+ if tc.expErrMsg != "" {
+ require.Equal(t, tc.expErrMsg, err.Error())
+ } else {
+ require.NoError(t, err)
+ }
+
+ require.Equal(t, tc.expVote, tc.voter.vote)
+ require.Equal(t, tc.expResult, tc.voter.result)
+
+ if tc.voter.vote != nil {
+ require.Equal(t, tc.expVotes, subtransaction.voteCounts[*tc.voter.vote])
+ }
+ })
+ }
+}
+
+func TestSubtransaction_quorumCheck(t *testing.T) {
+ voteA := newVote(t, "a")
+ voteB := newVote(t, "b")
+ voteC := newVote(t, "c")
+
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ voters []Voter
+ expVote *voting.Vote
+ expQuorum bool
+ }{
+ {
+ desc: "No votes yet",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: nil, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: nil, result: VoteUndecided},
+ {Name: "3", Votes: 1, vote: nil, result: VoteUndecided},
+ },
+ expVote: nil,
+ expQuorum: true,
+ },
+ {
+ desc: "Two nodes failed",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: nil, result: VoteCanceled},
+ {Name: "2", Votes: 1, vote: nil, result: VoteCanceled},
+ {Name: "3", Votes: 1, vote: nil, result: VoteUndecided},
+ },
+ expVote: nil,
+ expQuorum: false,
+ },
+ {
+ desc: "Two nodes vote differently",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided},
+ {Name: "3", Votes: 1, vote: nil, result: VoteUndecided},
+ },
+ expVote: nil,
+ expQuorum: true,
+ },
+ {
+ desc: "Two nodes vote same",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "3", Votes: 1, vote: nil, result: VoteUndecided},
+ },
+ expVote: &voteA,
+ expQuorum: true,
+ },
+ {
+ desc: "One node votes different",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "3", Votes: 1, vote: &voteB, result: VoteUndecided},
+ },
+ expVote: &voteA,
+ expQuorum: true,
+ },
+ {
+ desc: "All nodes votes different",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided},
+ {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided},
+ {Name: "3", Votes: 1, vote: &voteC, result: VoteUndecided},
+ },
+ expVote: nil,
+ expQuorum: false,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ subtransaction, err := newSubtransaction(tc.voters, threshold)
+ require.NoError(t, err)
+
+ // Vote counts usually updated when `updateVoterState()` is called
+ for _, voter := range tc.voters {
+ if voter.vote != nil {
+ subtransaction.voteCounts[*voter.vote] += voter.Votes
+ }
+ }
+
+ majorityVote, quorumPossible := subtransaction.quorumCheck()
+ require.Equal(t, tc.expVote, majorityVote)
+ require.Equal(t, tc.expQuorum, quorumPossible)
+ })
+ }
+}
+
+func TestSubtransaction_cancelNodeVoter(t *testing.T) {
+ voteA := newVote(t, "a")
+ voteB := newVote(t, "b")
+
+ threshold := uint(2)
+
+ for _, tc := range []struct {
+ desc string
+ voters []Voter
+ node string
+ result VoteResult
+ subDone bool
+ expErrMsg string
+ }{
+ {
+ desc: "Cancel undecided voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "1",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel canceled voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "1",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "cancel vote: transaction has been canceled",
+ },
+ {
+ desc: "Cancel nonexistent voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "4",
+ result: VoteCanceled,
+ subDone: false,
+ expErrMsg: "invalid node for subtransaction: \"4\"",
+ },
+ {
+ desc: "Cancel last voter",
+ voters: []Voter{
+ {Name: "1", Votes: 1, vote: &voteA},
+ {Name: "2", Votes: 1, vote: &voteB},
+ {Name: "3", Votes: 1},
+ },
+ node: "3",
+ result: VoteCanceled,
+ subDone: true,
+ expErrMsg: "",
+ },
+ {
+ desc: "Cancel voter making quorum impossible",
+ voters: []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ },
+ node: "2",
+ result: VoteCanceled,
+ subDone: true,
+ expErrMsg: "",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ subtransaction, err := newSubtransaction(tc.voters, threshold)
+ require.NoError(t, err)
+
+ if err := subtransaction.cancelNodeVoter(tc.node); tc.expErrMsg != "" {
+ require.EqualError(t, err, tc.expErrMsg)
+ } else {
+ require.NoError(t, err)
+ }
+
+ voter, ok := subtransaction.votersByNode[tc.node]
+ if ok {
+ require.Equal(t, tc.result, voter.result)
+ }
+
+ require.Equal(t, tc.subDone, subtransaction.isDone())
+ })
+ }
+}
+
func newVote(t *testing.T, s string) voting.Vote {
hash := sha1.Sum([]byte(s))
vote, err := voting.VoteFromHash(hash[:])
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index a1dd1f75a..fed090e81 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -227,7 +227,31 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
return nil, errors.New("invalid transaction state")
}
- for _, subtransaction := range t.subtransactions {
+ // Check for pending subtransactions on the specified node.
+ if subtransactions, err := t.getPendingNodeSubtransactions(node); err != nil {
+ return nil, err
+ } else if len(subtransactions) != 0 {
+ // First pending subtransaction is the next in queue for processing.
+ return subtransactions[0], nil
+ }
+
+ // If we arrive here, then we know that all the node has voted and
+ // reached quorum on all subtransactions. We can thus create a new one.
+ subtransaction, err := t.createSubtransaction()
+ if err != nil {
+ return nil, err
+ }
+
+ t.subtransactions = append(t.subtransactions, subtransaction)
+
+ return subtransaction, nil
+}
+
+// getPendingNodeSubtransactions returns all undecided subtransactions
+// for the specified voter. `nil` is returned if there are no pending
+// subtransactions for the node.
+func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransaction, error) {
+ for i, subtransaction := range t.subtransactions {
result, err := subtransaction.getResult(node)
if err != nil {
return nil, err
@@ -235,8 +259,9 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
switch result {
case VoteUndecided:
- // An undecided vote means we should vote on this one.
- return subtransaction, nil
+ // Nodes after first undecided voter will also be undecided.
+ // Remaining subtransactions are returned.
+ return t.subtransactions[i:], nil
case VoteCommitted:
// If we have committed this subtransaction, we're good
// to go.
@@ -258,16 +283,48 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
}
}
- // If we arrive here, then we know that all the node has voted and
- // reached quorum on all subtransactions. We can thus create a new one.
- subtransaction, err := newSubtransaction(t.voters, t.threshold)
- if err != nil {
- return nil, err
+ return nil, nil
+}
+
+// createSubtransaction returns a new subtransaction with any previously
+// canceled voter results propagated into the new subtransaction. Once a
+// voter has been canceled it can no longer vote in the current and all
+// future subtransactions. Propagating canceled voter state ensures
+// subtransactions do not wait for an impossible quorum due to canceled
+// voters.
+func (t *transaction) createSubtransaction() (*subtransaction, error) {
+ // If there are no subtransactions propagation can be skipped
+ if len(t.subtransactions) == 0 {
+ return newSubtransaction(t.voters, t.threshold)
}
- t.subtransactions = append(t.subtransactions, subtransaction)
+ prevSub := t.subtransactions[len(t.subtransactions)-1]
+
+ // Check previous voters state and propagate canceled voters.
+ var propagatedVoters []Voter
+ for _, voter := range t.voters {
+ prevVoter := prevSub.votersByNode[voter.Name]
+ if prevVoter == nil {
+ // This error should in theory never be reached. When a
+ // subtransaction is created it receives all voters from
+ // the parent transaction. The parent transaction voters
+ // are not mutated throughout the lifespan of the
+ // transaction meaning that all voters in a transaction
+ // should be present in a subtransaction.
+ return nil, errors.New("subtransaction missing previous voter")
+ }
- return subtransaction, nil
+ // Only canceled voters need to be propagated since a node voter
+ // can be canceled and the transaction continue. Other terminal
+ // results are applied to voters and end the transaction.
+ if prevVoter.result == VoteCanceled {
+ voter.result = VoteCanceled
+ }
+
+ propagatedVoters = append(propagatedVoters, voter)
+ }
+
+ return newSubtransaction(propagatedVoters, t.threshold)
}
func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error {
@@ -282,3 +339,39 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e
return subtransaction.collectVotes(ctx, node)
}
+
+// cancelNodeVoter cancels the undecided voters associated with
+// the specified node for all pending subtransactions.
+func (t *transaction) cancelNodeVoter(node string) error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // Get all undecided subtransactions for node.
+ pendingSubtransactions, err := t.getPendingNodeSubtransactions(node)
+ if err != nil {
+ return err
+ }
+
+ // If there are no pending subtransactions a new one should
+ // be created and added to the transaction so the failure
+ // can be tracked.
+ if len(pendingSubtransactions) == 0 {
+ sub, err := t.createSubtransaction()
+ if err != nil {
+ return err
+ }
+
+ t.subtransactions = append(t.subtransactions, sub)
+ pendingSubtransactions = []*subtransaction{sub}
+ }
+
+ // Cancel node voters in undecided subtransactions.
+ for _, subtransaction := range pendingSubtransactions {
+ err := subtransaction.cancelNodeVoter(node)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go
index b3648ed62..8d86c3d98 100644
--- a/internal/praefect/transactions/transaction_test.go
+++ b/internal/praefect/transactions/transaction_test.go
@@ -51,3 +51,335 @@ func TestTransaction_DidVote(t *testing.T) {
require.True(t, tx.DidVote("v1"))
require.True(t, tx.DidVote("v2"))
}
+
+func TestTransaction_getPendingNodeSubtransactions(t *testing.T) {
+ t.Parallel()
+
+ var id uint64
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ uncommittedSubtransaction, err := newSubtransaction(voters, threshold)
+ require.NoError(t, err)
+ committedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCommitted},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ mixedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteFailed},
+ {Name: "3", Votes: 1, result: VoteStopped},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ subs []*subtransaction
+ node string
+ expSubs []*subtransaction
+ expErrMsg string
+ }{
+ {
+ desc: "No subtransactions",
+ subs: []*subtransaction{},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "",
+ },
+ {
+ desc: "Single pending transaction",
+ subs: []*subtransaction{uncommittedSubtransaction},
+ node: "1",
+ expSubs: []*subtransaction{uncommittedSubtransaction},
+ expErrMsg: "",
+ },
+ {
+ desc: "Single complete transaction",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "",
+ },
+ {
+ desc: "Two pending transactions",
+ subs: []*subtransaction{uncommittedSubtransaction, uncommittedSubtransaction},
+ node: "1",
+ expSubs: []*subtransaction{uncommittedSubtransaction, uncommittedSubtransaction},
+ expErrMsg: "",
+ },
+ {
+ desc: "Two transactions, one pending",
+ subs: []*subtransaction{committedSubtransaction, uncommittedSubtransaction},
+ node: "1",
+ expSubs: []*subtransaction{uncommittedSubtransaction},
+ expErrMsg: "",
+ },
+ {
+ desc: "Missing node voter",
+ subs: []*subtransaction{uncommittedSubtransaction},
+ node: "4",
+ expSubs: nil,
+ expErrMsg: "invalid node for transaction: \"4\"",
+ },
+ {
+ desc: "Canceled node voter",
+ subs: []*subtransaction{mixedSubtransaction},
+ node: "1",
+ expSubs: nil,
+ expErrMsg: "transaction has been canceled",
+ },
+ {
+ desc: "Failed node voter",
+ subs: []*subtransaction{mixedSubtransaction},
+ node: "2",
+ expSubs: nil,
+ expErrMsg: "transaction did not reach quorum",
+ },
+ {
+ desc: "Stopped node voter",
+ subs: []*subtransaction{mixedSubtransaction},
+ node: "3",
+ expSubs: nil,
+ expErrMsg: "transaction has been stopped",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transaction, err := newTransaction(id, voters, threshold)
+ require.NoError(t, err)
+
+ transaction.subtransactions = tc.subs
+
+ subtransactions, err := transaction.getPendingNodeSubtransactions(tc.node)
+ if tc.expErrMsg != "" {
+ require.EqualError(t, err, tc.expErrMsg)
+ } else {
+ require.NoError(t, err)
+ }
+
+ require.Equal(t, tc.expSubs, subtransactions)
+ })
+ }
+}
+
+func TestTransaction_createSubtransaction(t *testing.T) {
+ t.Parallel()
+
+ var id uint64
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ committedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCommitted},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ mixedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ canceledSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCanceled},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCanceled},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ subs []*subtransaction
+ node string
+ expVoters []Voter
+ }{
+ {
+ desc: "No previous subtransactions",
+ subs: nil,
+ node: "1",
+ expVoters: []Voter{
+ {Name: "1", result: VoteUndecided},
+ {Name: "2", result: VoteUndecided},
+ {Name: "3", result: VoteUndecided},
+ },
+ },
+ {
+ desc: "One previous subtransaction, no canceled voters",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "1",
+ expVoters: []Voter{
+ {Name: "1", result: VoteUndecided},
+ {Name: "2", result: VoteUndecided},
+ {Name: "3", result: VoteUndecided},
+ },
+ },
+ {
+ desc: "One previous subtransaction, one canceled voter",
+ subs: []*subtransaction{mixedSubtransaction},
+ node: "1",
+ expVoters: []Voter{
+ {Name: "1", result: VoteCanceled},
+ {Name: "2", result: VoteUndecided},
+ {Name: "3", result: VoteUndecided},
+ },
+ },
+ {
+ desc: "One previous subtransaction, two canceled voters",
+ subs: []*subtransaction{canceledSubtransaction},
+ node: "1",
+ expVoters: []Voter{
+ {Name: "1", result: VoteCanceled},
+ {Name: "2", result: VoteUndecided},
+ {Name: "3", result: VoteCanceled},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transaction, err := newTransaction(id, voters, threshold)
+ require.NoError(t, err)
+
+ transaction.subtransactions = tc.subs
+
+ subtransaction, err := transaction.createSubtransaction()
+ require.NoError(t, err)
+
+ for _, expVoter := range tc.expVoters {
+ voter := subtransaction.votersByNode[expVoter.Name]
+ require.NotNil(t, voter)
+ require.Equal(t, expVoter.result, voter.result)
+ }
+ })
+ }
+}
+
+func TestTransaction_cancelNodeVoter(t *testing.T) {
+ t.Parallel()
+
+ var id uint64
+ voters := []Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ {Name: "3", Votes: 1},
+ }
+ threshold := uint(2)
+
+ committedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteCommitted},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ undecidedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteUndecided},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ decidedSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteCommitted},
+ {Name: "3", Votes: 1, result: VoteCommitted},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+ cancelingSubtransaction, err := newSubtransaction(
+ []Voter{
+ {Name: "1", Votes: 1, result: VoteUndecided},
+ {Name: "2", Votes: 1, result: VoteUndecided},
+ {Name: "3", Votes: 1, result: VoteCanceled},
+ },
+ threshold,
+ )
+ require.NoError(t, err)
+
+ for _, tc := range []struct {
+ desc string
+ subs []*subtransaction
+ node string
+ expErrMsg string
+ }{
+ {
+ desc: "No subtransactions",
+ subs: nil,
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "No pending subtransactions",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "One Pending subtransaction",
+ subs: []*subtransaction{undecidedSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "Two Pending subtransactions",
+ subs: []*subtransaction{decidedSubtransaction, cancelingSubtransaction},
+ node: "1",
+ expErrMsg: "",
+ },
+ {
+ desc: "Invalid node",
+ subs: []*subtransaction{committedSubtransaction},
+ node: "4",
+ expErrMsg: "invalid node for transaction: \"4\"",
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ transaction, err := newTransaction(id, voters, threshold)
+ require.NoError(t, err)
+
+ transaction.subtransactions = tc.subs
+
+ err = transaction.cancelNodeVoter(tc.node)
+ if tc.expErrMsg != "" {
+ require.Error(t, err)
+ require.Equal(t, tc.expErrMsg, err.Error())
+ } else {
+ require.NoError(t, err)
+
+ // Check the last subtransaction to make sure cancel propagation occurs.
+ sub := transaction.subtransactions[len(transaction.subtransactions)-1]
+ voter := sub.votersByNode[tc.node]
+ require.Equal(t, VoteCanceled, voter.result)
+ }
+ })
+ }
+}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 02440e7f0..bf8d59899 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -199,6 +199,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic.
// Randomly enable the flag to exercise both paths to some extent.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0)
+ // NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic.
+ // Randomly enable the flag to exercise both paths to some extent.
+ ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0)
for _, opt := range opts {
ctx = opt(ctx)