diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-10-24 08:55:44 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-10-24 08:55:44 +0300 |
commit | 9100bc1ba991757588906a818e24d52932ba665c (patch) | |
tree | b018a47a7815fd459cbeb92ad22bebc764695186 | |
parent | f8914b6e0da033ee251ca2f79384043d25a01ec6 (diff) | |
parent | e7a07dca63658bc5fd15399522c7a359d002b413 (diff) |
Merge branch 'jt-praefect-transaction-error-handler' into 'master'
Praefect: Update voter state on failed node RPC
Closes #4088
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/4880
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/metadata/featureflag/ff_node_error_cancels_voter.go | 11 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 17 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 113 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 19 | ||||
-rw-r--r-- | internal/praefect/transactions/manager_test.go | 65 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 129 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction_test.go | 316 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 113 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction_test.go | 332 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 3 |
10 files changed, 1056 insertions, 62 deletions
diff --git a/internal/metadata/featureflag/ff_node_error_cancels_voter.go b/internal/metadata/featureflag/ff_node_error_cancels_voter.go new file mode 100644 index 000000000..797c7c861 --- /dev/null +++ b/internal/metadata/featureflag/ff_node_error_cancels_voter.go @@ -0,0 +1,11 @@ +package featureflag + +// NodeErrorCancelsVoter enables cancellation of the voter associated +// with a failed node RPC. By canceling voters that can no longer vote, +// the transaction can fail faster if quorum becomes impossible. +var NodeErrorCancelsVoter = NewFeatureFlag( + "node_error_cancels_voter", + "v15.6.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4552", + false, +) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index e000c24cf..e16591223 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -467,10 +467,19 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall ctxlogrus.Extract(ctx).WithError(err). Error("proxying to secondary failed") - // For now, any errors returned by secondaries are ignored. - // This is mostly so that we do not abort transactions which - // are ongoing and may succeed even with a subset of - // secondaries bailing out. + if featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) { + // Cancels failed node's voter in its current subtransaction. + // Also updates internal state of subtransaction to fail and + // release blocked voters if quorum becomes impossible. + if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil { + ctxlogrus.Extract(ctx).WithError(err). + Error("canceling secondary voter failed") + } + } + + // The error is ignored, so we do not abort transactions + // which are ongoing and may succeed even with a subset + // of secondaries bailing out. return nil }, }) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fb9c9ff99..8b340e5a0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -395,6 +395,119 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { require.NoError(t, err) } +func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) { + t.Parallel() + testhelper.NewFeatureSets(featureflag.NodeErrorCancelsVoter).Run(t, testStreamDirectorMutatorSecondaryErrorHandling) +} + +func testStreamDirectorMutatorSecondaryErrorHandling(t *testing.T, ctx context.Context) { + ctx, ctxCancel := context.WithCancel(ctx) + + socket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, socket) + + primaryNode := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-1"} + secondaryNode1 := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-2"} + secondaryNode2 := &config.Node{Address: "unix://" + socket, Storage: "praefect-internal-3"} + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*config.Node{primaryNode, secondaryNode1, secondaryNode2}, + }, + }, + } + + repo := gitalypb.Repository{ + StorageName: "praefect", + RelativePath: "/path/to/hashed/storage", + } + + nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + defer nodeMgr.Stop() + + shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) + require.NoError(t, err) + + for _, name := range []string{"praefect-internal-1", "praefect-internal-2", "praefect-internal-3"} { + node, err := shard.GetNode(name) + require.NoError(t, err) + waitNodeToChangeHealthStatus(t, ctx, node, true) + } + + rs := datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{"praefect-internal-1": {}, "praefect-internal-2": {}, "praefect-internal-3": {}}, nil + }, + } + + txMgr := transactions.NewManager(conf) + + coordinator := NewCoordinator( + datastore.NewPostgresReplicationEventQueue(testdb.New(t)), + rs, + NewNodeManagerRouter(nodeMgr, rs), + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) + + fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" + + frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{ + Repository: &repo, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + txCtx := peer.NewContext(streamParams.Primary().Ctx, &peer.Peer{}) + transaction, err := txinfo.TransactionFromContext(txCtx) + require.NoError(t, err) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + vote := voting.VoteFromData([]byte("vote")) + err := txMgr.VoteTransaction(ctx, transaction.ID, "praefect-internal-1", vote) + if featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) { + require.ErrorIs(t, err, transactions.ErrTransactionFailed) + } else { + require.EqualError(t, err, "context canceled") + } + }() + + for _, secondary := range streamParams.Secondaries() { + wg.Add(1) + secondary := secondary + go func() { + err := secondary.ErrHandler(errors.New("node RPC failure")) + require.NoError(t, err) + + wg.Done() + }() + } + + // If the feature flag is not enabled the context must be canceled to unblock voters. + if !featureflag.NodeErrorCancelsVoter.IsEnabled(ctx) { + ctxCancel() + } + + wg.Wait() + + err = streamParams.RequestFinalizer() + require.NoError(t, err) + ctxCancel() +} + func TestStreamDirector_maintenance(t *testing.T) { t.Parallel() diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 57a81d7db..9231dbd19 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -230,3 +230,22 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e return nil } + +// CancelTransactionNodeVoter cancels the voter associated with the specified transaction +// and node. Voters are canceled when the node RPC fails and its votes can no longer count +// towards quorum. +func (mgr *Manager) CancelTransactionNodeVoter(transactionID uint64, node string) error { + mgr.lock.Lock() + transaction, ok := mgr.transactions[transactionID] + mgr.lock.Unlock() + + if !ok { + return fmt.Errorf("%w: %d", ErrNotFound, transactionID) + } + + if err := transaction.cancelNodeVoter(node); err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go new file mode 100644 index 000000000..305257373 --- /dev/null +++ b/internal/praefect/transactions/manager_test.go @@ -0,0 +1,65 @@ +package transactions + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestManager_CancelTransactionNodeVoter(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + for _, tc := range []struct { + desc string + register bool + node string + expErrMsg string + }{ + { + desc: "No transaction exists", + register: false, + node: "1", + expErrMsg: "transaction not found: 0", + }, + { + desc: "Transaction exists", + register: true, + node: "1", + expErrMsg: "", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + manager := NewManager(config.Config{}) + + var id uint64 + if tc.register { + transaction, cleanup, err := manager.RegisterTransaction(ctx, voters, threshold) + defer func() { + err := cleanup() + require.NoError(t, err) + }() + require.NoError(t, err) + + id = transaction.ID() + } + + err := manager.CancelTransactionNodeVoter(id, "1") + if tc.expErrMsg != "" { + require.Error(t, err) + require.Equal(t, tc.expErrMsg, err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index ab21107b8..19a43b667 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -79,7 +79,9 @@ func (t *subtransaction) stop() error { switch voter.result { case VoteCanceled: // If the vote was canceled already, we cannot stop it. - return ErrTransactionCanceled + // A single node voter being canceled is not indicative + // of all voter's state. Other voters must be checked. + continue case VoteStopped: // Similar if the vote was stopped already. return ErrTransactionStopped @@ -136,9 +138,9 @@ func (t *subtransaction) vote(node string, vote voting.Vote) error { return nil } -// updateVoterStates updates undecided voters or cancels existing votes of decided voters if given a -// `nil` vote. Voters are updated either as soon as quorum was reached or alternatively when all -// votes were cast. +// updateVoterState updates undecided voters or cancels existing votes if given a `nil` +// vote. Voters are updated either as soon as quorum was reached or alternatively when +// quorum becomes impossible. func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error { switch voter.result { case VoteUndecided: @@ -171,10 +173,20 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error return errors.New("subtransaction was already finished") } + // A voter's result can only be canceled if the subtransaction is still pending. + // If a change has already been committed to disk the voter result cannot be + // changed since the subtransction is considered complete. + voter.result = VoteCanceled + // Remove the voter's support for the vote so it's not counted towards the // majority. The node is not going to commit the subtransaction anyway. - t.voteCounts[*voter.vote] -= voter.Votes - voter.result = VoteCanceled + if voter.vote != nil { + t.voteCounts[*voter.vote] -= voter.Votes + } + + // A canceled voter can no longer voter so its vote is + // reset after being subtracted from the vote counts. + voter.vote = nil } defer func() { @@ -183,26 +195,12 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error } }() - var majorityVote *voting.Vote - var majorityVoteCount uint - for v, voteCount := range t.voteCounts { - if majorityVoteCount < voteCount { - v := v - majorityVoteCount = voteCount - majorityVote = &v - } - } - - var outstandingVotes uint - for _, voter := range t.votersByNode { - if voter.vote == nil { - outstandingVotes += voter.Votes - } - } - - // When the majority vote didn't yet cross the threshold and the number of outstanding votes - // may still get us across that threshold, then we need to wait for more votes to come in. - if majorityVoteCount < t.threshold && majorityVoteCount+outstandingVotes >= t.threshold { + // Check if quorum has been reached or if quorum is still attainable for the subtransaction. + // If quorum has not been achieved the vote returned by the quorum check will be `nil`. + // As long as quorum has not been achieved and is still possible, the subtransaction + // will wait for additional voter's results to come in. + majorityVote, quorumPossible := t.quorumCheck() + if majorityVote == nil && quorumPossible { return nil } @@ -223,15 +221,15 @@ func (t *subtransaction) updateVoterState(voter *Voter, vote *voting.Vote) error continue } - // If the majority vote count is smaller than the threshold at this point, then we - // know that we cannot ever reach it anymore even with the votes which are still - // outstanding. We can thus mark this node as failed. - if majorityVoteCount < t.threshold { + // If quorum is not possible we know there are not enough outstanding votes to cross + // the threshold required by the subtransaction. We can thus mark this node as failed. + if !quorumPossible { voter.result = VoteFailed continue } - // Otherwise, the result depends on whether the voter agrees on the quorum or not. + // At this point we know quorum has been achieved and a majority vote is present. + // A check is done to see if the voter agrees with the quorum majority vote. if *voter.vote == *majorityVote { voter.result = VoteCommitted } else { @@ -252,26 +250,45 @@ func (t *subtransaction) mustSignalVoters() bool { return false } - // Check if any node has reached the threshold. If it did, then we need - // to signal voters. - for _, voteCount := range t.voteCounts { - if voteCount >= t.threshold { - return true + majorityVote, quorumPossible := t.quorumCheck() + + // If there is majority vote threshold has been met and voters can be signaled. + if majorityVote != nil { + return true + } + + // If quorum is still possible the voters should not be signaled, since + // remaining voters could cause us to reach quorum. If quorum is not + // possible voters should be unblocked to allow the transaction to fail. + return !quorumPossible +} + +// quorumCheck returns the majority vote if quorum has been achieved +// and if not `nil` is returned. It also returns whether quorum can +// still be achieved with the outstanding voters. +func (t *subtransaction) quorumCheck() (*voting.Vote, bool) { + var leader *voting.Vote + var majority uint + for v, voteCount := range t.voteCounts { + if majority < voteCount { + v := v + majority = voteCount + leader = &v } } - // The threshold wasn't reached by any node yet. If there are undecided voters, then we - // cannot notify yet as any remaining nodes may cause us to reach quorum. + if majority >= t.threshold { + return leader, true + } + + var outstanding uint for _, voter := range t.votersByNode { - if voter.result == VoteUndecided { - return false + if voter.vote == nil && voter.result == VoteUndecided { + outstanding += voter.Votes } } - // Otherwise we know that all votes are in and that no quorum was - // reached. We thus need to notify callers of the failed vote as the - // last node which has cast its vote. - return true + return nil, majority+outstanding >= t.threshold } func (t *subtransaction) collectVotes(ctx context.Context, node string) error { @@ -363,3 +380,27 @@ func (t *subtransaction) getVote(node string) (*voting.Vote, error) { vote := *voter.vote return &vote, nil } + +// cancelNodeVoter updates a node's associated voter state to `VoteCanceled`. +// All must voters wait until either quorum has been achieved or quorum +// becomes impossible. A canceled voter's votes are not counted as a part of +// the total outstanding votes which can cause a subtransaction to not have +// enough votes to reach the required threshold. If this happens the vote +// will be considered failed and the voters unblocked. +func (t *subtransaction) cancelNodeVoter(node string) error { + t.lock.Lock() + defer t.lock.Unlock() + + voter, ok := t.votersByNode[node] + if !ok { + return fmt.Errorf("invalid node for subtransaction: %q", node) + } + + // Updating voter state with a nil vote will result in the voter + // getting canceled. + if err := t.updateVoterState(voter, nil); err != nil { + return fmt.Errorf("cancel vote: %w", err) + } + + return nil +} diff --git a/internal/praefect/transactions/subtransaction_test.go b/internal/praefect/transactions/subtransaction_test.go index d7b2eee42..24b35cc54 100644 --- a/internal/praefect/transactions/subtransaction_test.go +++ b/internal/praefect/transactions/subtransaction_test.go @@ -51,7 +51,7 @@ func TestSubtransaction_stop(t *testing.T) { require.Equal(t, VoteFailed, s.votersByNode["3"].result) }) - t.Run("stop of canceled transaction fails", func(t *testing.T) { + t.Run("stop of transaction with single canceled voter", func(t *testing.T) { s, err := newSubtransaction([]Voter{ {Name: "1", Votes: 1, result: VoteUndecided}, {Name: "2", Votes: 1, result: VoteCommitted}, @@ -60,8 +60,8 @@ func TestSubtransaction_stop(t *testing.T) { }, 1) require.NoError(t, err) - require.Equal(t, s.stop(), ErrTransactionCanceled) - require.False(t, s.isDone()) + require.NoError(t, s.stop()) + require.True(t, s.isDone()) }) t.Run("stop of stopped transaction fails", func(t *testing.T) { @@ -512,6 +512,8 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) { s, err := newSubtransaction(voters, totalWeight/2+1) require.NoError(t, err) + var deferredCollectVotes []func(ctx context.Context) + results := make([]chan error, len(tc.outcomes)) for i, outcome := range tc.outcomes { voterName := voters[i].Name @@ -535,7 +537,15 @@ func TestSubtransaction_voterStopsWaiting(t *testing.T) { continue } - go collectVotes(ctx) + // Since voters are unblocked once quorum becomes impossible voters that + // are not dropped must have their call to `collectVotes` deferred so + // the result state will not be prone to race conditions. + deferredCollectVotes = append(deferredCollectVotes, collectVotes) + } + + // With all votes cast the remaining `collectVotes` can be called. + for _, collectVotes := range deferredCollectVotes { + collectVotes(ctx) } for i, outcome := range tc.outcomes { @@ -593,6 +603,304 @@ func TestSubtransaction_race(t *testing.T) { } } +func TestSubtransaction_updateVoterState(t *testing.T) { + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + vote := newVote(t, "a") + + for _, tc := range []struct { + desc string + voter *Voter + vote *voting.Vote + expVote *voting.Vote + expVotes uint + expResult VoteResult + expErrMsg string + }{ + { + desc: "Update voter", + voter: &Voter{ + Name: "1", Votes: 1, vote: nil, result: VoteUndecided, + }, + vote: &vote, + expVote: &vote, + expVotes: 1, + expResult: VoteUndecided, + expErrMsg: "", + }, + { + desc: "Cancel voter that has not voted", + voter: &Voter{ + Name: "1", Votes: 1, vote: nil, result: VoteUndecided, + }, + vote: nil, + expVote: nil, + expVotes: 0, + expResult: VoteCanceled, + expErrMsg: "", + }, + { + desc: "Cancel voter that has voted", + voter: &Voter{ + Name: "1", Votes: 1, vote: &vote, result: VoteUndecided, + }, + vote: nil, + expVote: nil, + expVotes: 0, + expResult: VoteCanceled, + expErrMsg: "", + }, + { + desc: "Update canceled voter", + voter: &Voter{ + Name: "1", Votes: 1, vote: nil, result: VoteCanceled, + }, + vote: nil, + expVote: nil, + expVotes: 0, + expResult: VoteCanceled, + expErrMsg: "transaction has been canceled", + }, + { + desc: "Update canceled voter", + voter: &Voter{ + Name: "1", Votes: 1, vote: nil, result: VoteStopped, + }, + vote: nil, + expVote: nil, + expVotes: 0, + expResult: VoteStopped, + expErrMsg: "transaction has been stopped", + }, + { + desc: "Update committed voter", + voter: &Voter{ + Name: "1", Votes: 1, vote: nil, result: VoteCommitted, + }, + vote: nil, + expVote: nil, + expVotes: 0, + expResult: VoteCommitted, + expErrMsg: "cannot change committed vote", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + subtransaction, err := newSubtransaction(voters, threshold) + require.NoError(t, err) + + if tc.voter.vote != nil { + subtransaction.voteCounts[*tc.voter.vote] += tc.voter.Votes + } + + err = subtransaction.updateVoterState(tc.voter, tc.vote) + if tc.expErrMsg != "" { + require.Equal(t, tc.expErrMsg, err.Error()) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expVote, tc.voter.vote) + require.Equal(t, tc.expResult, tc.voter.result) + + if tc.voter.vote != nil { + require.Equal(t, tc.expVotes, subtransaction.voteCounts[*tc.voter.vote]) + } + }) + } +} + +func TestSubtransaction_quorumCheck(t *testing.T) { + voteA := newVote(t, "a") + voteB := newVote(t, "b") + voteC := newVote(t, "c") + + threshold := uint(2) + + for _, tc := range []struct { + desc string + voters []Voter + expVote *voting.Vote + expQuorum bool + }{ + { + desc: "No votes yet", + voters: []Voter{ + {Name: "1", Votes: 1, vote: nil, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: nil, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + expVote: nil, + expQuorum: true, + }, + { + desc: "Two nodes failed", + voters: []Voter{ + {Name: "1", Votes: 1, vote: nil, result: VoteCanceled}, + {Name: "2", Votes: 1, vote: nil, result: VoteCanceled}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + expVote: nil, + expQuorum: false, + }, + { + desc: "Two nodes vote differently", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + expVote: nil, + expQuorum: true, + }, + { + desc: "Two nodes vote same", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: nil, result: VoteUndecided}, + }, + expVote: &voteA, + expQuorum: true, + }, + { + desc: "One node votes different", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: &voteB, result: VoteUndecided}, + }, + expVote: &voteA, + expQuorum: true, + }, + { + desc: "All nodes votes different", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA, result: VoteUndecided}, + {Name: "2", Votes: 1, vote: &voteB, result: VoteUndecided}, + {Name: "3", Votes: 1, vote: &voteC, result: VoteUndecided}, + }, + expVote: nil, + expQuorum: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + subtransaction, err := newSubtransaction(tc.voters, threshold) + require.NoError(t, err) + + // Vote counts usually updated when `updateVoterState()` is called + for _, voter := range tc.voters { + if voter.vote != nil { + subtransaction.voteCounts[*voter.vote] += voter.Votes + } + } + + majorityVote, quorumPossible := subtransaction.quorumCheck() + require.Equal(t, tc.expVote, majorityVote) + require.Equal(t, tc.expQuorum, quorumPossible) + }) + } +} + +func TestSubtransaction_cancelNodeVoter(t *testing.T) { + voteA := newVote(t, "a") + voteB := newVote(t, "b") + + threshold := uint(2) + + for _, tc := range []struct { + desc string + voters []Voter + node string + result VoteResult + subDone bool + expErrMsg string + }{ + { + desc: "Cancel undecided voter", + voters: []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "1", + result: VoteCanceled, + subDone: false, + expErrMsg: "", + }, + { + desc: "Cancel canceled voter", + voters: []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "1", + result: VoteCanceled, + subDone: false, + expErrMsg: "cancel vote: transaction has been canceled", + }, + { + desc: "Cancel nonexistent voter", + voters: []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "4", + result: VoteCanceled, + subDone: false, + expErrMsg: "invalid node for subtransaction: \"4\"", + }, + { + desc: "Cancel last voter", + voters: []Voter{ + {Name: "1", Votes: 1, vote: &voteA}, + {Name: "2", Votes: 1, vote: &voteB}, + {Name: "3", Votes: 1}, + }, + node: "3", + result: VoteCanceled, + subDone: true, + expErrMsg: "", + }, + { + desc: "Cancel voter making quorum impossible", + voters: []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + }, + node: "2", + result: VoteCanceled, + subDone: true, + expErrMsg: "", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + subtransaction, err := newSubtransaction(tc.voters, threshold) + require.NoError(t, err) + + if err := subtransaction.cancelNodeVoter(tc.node); tc.expErrMsg != "" { + require.EqualError(t, err, tc.expErrMsg) + } else { + require.NoError(t, err) + } + + voter, ok := subtransaction.votersByNode[tc.node] + if ok { + require.Equal(t, tc.result, voter.result) + } + + require.Equal(t, tc.subDone, subtransaction.isDone()) + }) + } +} + func newVote(t *testing.T, s string) voting.Vote { hash := sha1.Sum([]byte(s)) vote, err := voting.VoteFromHash(hash[:]) diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index a1dd1f75a..fed090e81 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -227,7 +227,31 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e return nil, errors.New("invalid transaction state") } - for _, subtransaction := range t.subtransactions { + // Check for pending subtransactions on the specified node. + if subtransactions, err := t.getPendingNodeSubtransactions(node); err != nil { + return nil, err + } else if len(subtransactions) != 0 { + // First pending subtransaction is the next in queue for processing. + return subtransactions[0], nil + } + + // If we arrive here, then we know that all the node has voted and + // reached quorum on all subtransactions. We can thus create a new one. + subtransaction, err := t.createSubtransaction() + if err != nil { + return nil, err + } + + t.subtransactions = append(t.subtransactions, subtransaction) + + return subtransaction, nil +} + +// getPendingNodeSubtransactions returns all undecided subtransactions +// for the specified voter. `nil` is returned if there are no pending +// subtransactions for the node. +func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransaction, error) { + for i, subtransaction := range t.subtransactions { result, err := subtransaction.getResult(node) if err != nil { return nil, err @@ -235,8 +259,9 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e switch result { case VoteUndecided: - // An undecided vote means we should vote on this one. - return subtransaction, nil + // Nodes after first undecided voter will also be undecided. + // Remaining subtransactions are returned. + return t.subtransactions[i:], nil case VoteCommitted: // If we have committed this subtransaction, we're good // to go. @@ -258,16 +283,48 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e } } - // If we arrive here, then we know that all the node has voted and - // reached quorum on all subtransactions. We can thus create a new one. - subtransaction, err := newSubtransaction(t.voters, t.threshold) - if err != nil { - return nil, err + return nil, nil +} + +// createSubtransaction returns a new subtransaction with any previously +// canceled voter results propagated into the new subtransaction. Once a +// voter has been canceled it can no longer vote in the current and all +// future subtransactions. Propagating canceled voter state ensures +// subtransactions do not wait for an impossible quorum due to canceled +// voters. +func (t *transaction) createSubtransaction() (*subtransaction, error) { + // If there are no subtransactions propagation can be skipped + if len(t.subtransactions) == 0 { + return newSubtransaction(t.voters, t.threshold) } - t.subtransactions = append(t.subtransactions, subtransaction) + prevSub := t.subtransactions[len(t.subtransactions)-1] + + // Check previous voters state and propagate canceled voters. + var propagatedVoters []Voter + for _, voter := range t.voters { + prevVoter := prevSub.votersByNode[voter.Name] + if prevVoter == nil { + // This error should in theory never be reached. When a + // subtransaction is created it receives all voters from + // the parent transaction. The parent transaction voters + // are not mutated throughout the lifespan of the + // transaction meaning that all voters in a transaction + // should be present in a subtransaction. + return nil, errors.New("subtransaction missing previous voter") + } - return subtransaction, nil + // Only canceled voters need to be propagated since a node voter + // can be canceled and the transaction continue. Other terminal + // results are applied to voters and end the transaction. + if prevVoter.result == VoteCanceled { + voter.result = VoteCanceled + } + + propagatedVoters = append(propagatedVoters, voter) + } + + return newSubtransaction(propagatedVoters, t.threshold) } func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error { @@ -282,3 +339,39 @@ func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) e return subtransaction.collectVotes(ctx, node) } + +// cancelNodeVoter cancels the undecided voters associated with +// the specified node for all pending subtransactions. +func (t *transaction) cancelNodeVoter(node string) error { + t.lock.Lock() + defer t.lock.Unlock() + + // Get all undecided subtransactions for node. + pendingSubtransactions, err := t.getPendingNodeSubtransactions(node) + if err != nil { + return err + } + + // If there are no pending subtransactions a new one should + // be created and added to the transaction so the failure + // can be tracked. + if len(pendingSubtransactions) == 0 { + sub, err := t.createSubtransaction() + if err != nil { + return err + } + + t.subtransactions = append(t.subtransactions, sub) + pendingSubtransactions = []*subtransaction{sub} + } + + // Cancel node voters in undecided subtransactions. + for _, subtransaction := range pendingSubtransactions { + err := subtransaction.cancelNodeVoter(node) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go index b3648ed62..8d86c3d98 100644 --- a/internal/praefect/transactions/transaction_test.go +++ b/internal/praefect/transactions/transaction_test.go @@ -51,3 +51,335 @@ func TestTransaction_DidVote(t *testing.T) { require.True(t, tx.DidVote("v1")) require.True(t, tx.DidVote("v2")) } + +func TestTransaction_getPendingNodeSubtransactions(t *testing.T) { + t.Parallel() + + var id uint64 + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + uncommittedSubtransaction, err := newSubtransaction(voters, threshold) + require.NoError(t, err) + committedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCommitted}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + mixedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1, result: VoteFailed}, + {Name: "3", Votes: 1, result: VoteStopped}, + }, + threshold, + ) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + subs []*subtransaction + node string + expSubs []*subtransaction + expErrMsg string + }{ + { + desc: "No subtransactions", + subs: []*subtransaction{}, + node: "1", + expSubs: nil, + expErrMsg: "", + }, + { + desc: "Single pending transaction", + subs: []*subtransaction{uncommittedSubtransaction}, + node: "1", + expSubs: []*subtransaction{uncommittedSubtransaction}, + expErrMsg: "", + }, + { + desc: "Single complete transaction", + subs: []*subtransaction{committedSubtransaction}, + node: "1", + expSubs: nil, + expErrMsg: "", + }, + { + desc: "Two pending transactions", + subs: []*subtransaction{uncommittedSubtransaction, uncommittedSubtransaction}, + node: "1", + expSubs: []*subtransaction{uncommittedSubtransaction, uncommittedSubtransaction}, + expErrMsg: "", + }, + { + desc: "Two transactions, one pending", + subs: []*subtransaction{committedSubtransaction, uncommittedSubtransaction}, + node: "1", + expSubs: []*subtransaction{uncommittedSubtransaction}, + expErrMsg: "", + }, + { + desc: "Missing node voter", + subs: []*subtransaction{uncommittedSubtransaction}, + node: "4", + expSubs: nil, + expErrMsg: "invalid node for transaction: \"4\"", + }, + { + desc: "Canceled node voter", + subs: []*subtransaction{mixedSubtransaction}, + node: "1", + expSubs: nil, + expErrMsg: "transaction has been canceled", + }, + { + desc: "Failed node voter", + subs: []*subtransaction{mixedSubtransaction}, + node: "2", + expSubs: nil, + expErrMsg: "transaction did not reach quorum", + }, + { + desc: "Stopped node voter", + subs: []*subtransaction{mixedSubtransaction}, + node: "3", + expSubs: nil, + expErrMsg: "transaction has been stopped", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transaction, err := newTransaction(id, voters, threshold) + require.NoError(t, err) + + transaction.subtransactions = tc.subs + + subtransactions, err := transaction.getPendingNodeSubtransactions(tc.node) + if tc.expErrMsg != "" { + require.EqualError(t, err, tc.expErrMsg) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expSubs, subtransactions) + }) + } +} + +func TestTransaction_createSubtransaction(t *testing.T) { + t.Parallel() + + var id uint64 + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + committedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCommitted}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + mixedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + canceledSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCanceled}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCanceled}, + }, + threshold, + ) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + subs []*subtransaction + node string + expVoters []Voter + }{ + { + desc: "No previous subtransactions", + subs: nil, + node: "1", + expVoters: []Voter{ + {Name: "1", result: VoteUndecided}, + {Name: "2", result: VoteUndecided}, + {Name: "3", result: VoteUndecided}, + }, + }, + { + desc: "One previous subtransaction, no canceled voters", + subs: []*subtransaction{committedSubtransaction}, + node: "1", + expVoters: []Voter{ + {Name: "1", result: VoteUndecided}, + {Name: "2", result: VoteUndecided}, + {Name: "3", result: VoteUndecided}, + }, + }, + { + desc: "One previous subtransaction, one canceled voter", + subs: []*subtransaction{mixedSubtransaction}, + node: "1", + expVoters: []Voter{ + {Name: "1", result: VoteCanceled}, + {Name: "2", result: VoteUndecided}, + {Name: "3", result: VoteUndecided}, + }, + }, + { + desc: "One previous subtransaction, two canceled voters", + subs: []*subtransaction{canceledSubtransaction}, + node: "1", + expVoters: []Voter{ + {Name: "1", result: VoteCanceled}, + {Name: "2", result: VoteUndecided}, + {Name: "3", result: VoteCanceled}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transaction, err := newTransaction(id, voters, threshold) + require.NoError(t, err) + + transaction.subtransactions = tc.subs + + subtransaction, err := transaction.createSubtransaction() + require.NoError(t, err) + + for _, expVoter := range tc.expVoters { + voter := subtransaction.votersByNode[expVoter.Name] + require.NotNil(t, voter) + require.Equal(t, expVoter.result, voter.result) + } + }) + } +} + +func TestTransaction_cancelNodeVoter(t *testing.T) { + t.Parallel() + + var id uint64 + voters := []Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + {Name: "3", Votes: 1}, + } + threshold := uint(2) + + committedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteCommitted}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + undecidedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteUndecided}, + {Name: "3", Votes: 1, result: VoteUndecided}, + }, + threshold, + ) + require.NoError(t, err) + decidedSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteCommitted}, + {Name: "3", Votes: 1, result: VoteCommitted}, + }, + threshold, + ) + require.NoError(t, err) + cancelingSubtransaction, err := newSubtransaction( + []Voter{ + {Name: "1", Votes: 1, result: VoteUndecided}, + {Name: "2", Votes: 1, result: VoteUndecided}, + {Name: "3", Votes: 1, result: VoteCanceled}, + }, + threshold, + ) + require.NoError(t, err) + + for _, tc := range []struct { + desc string + subs []*subtransaction + node string + expErrMsg string + }{ + { + desc: "No subtransactions", + subs: nil, + node: "1", + expErrMsg: "", + }, + { + desc: "No pending subtransactions", + subs: []*subtransaction{committedSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "One Pending subtransaction", + subs: []*subtransaction{undecidedSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "Two Pending subtransactions", + subs: []*subtransaction{decidedSubtransaction, cancelingSubtransaction}, + node: "1", + expErrMsg: "", + }, + { + desc: "Invalid node", + subs: []*subtransaction{committedSubtransaction}, + node: "4", + expErrMsg: "invalid node for transaction: \"4\"", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + transaction, err := newTransaction(id, voters, threshold) + require.NoError(t, err) + + transaction.subtransactions = tc.subs + + err = transaction.cancelNodeVoter(tc.node) + if tc.expErrMsg != "" { + require.Error(t, err) + require.Equal(t, tc.expErrMsg, err.Error()) + } else { + require.NoError(t, err) + + // Check the last subtransaction to make sure cancel propagation occurs. + sub := transaction.subtransactions[len(transaction.subtransactions)-1] + voter := sub.votersByNode[tc.node] + require.Equal(t, VoteCanceled, voter.result) + } + }) + } +} diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 02440e7f0..bf8d59899 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -199,6 +199,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic. // Randomly enable the flag to exercise both paths to some extent. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0) + // NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic. + // Randomly enable the flag to exercise both paths to some extent. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) |