diff options
author | Paul Okstad <pokstad@gitlab.com> | 2021-02-02 07:58:54 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2021-02-02 07:58:54 +0300 |
commit | 9c2da9436f6a41a244a30deef6f48798f877e909 (patch) | |
tree | beae082286b2b2da5b5ad3bcc29e8599057ba855 | |
parent | 827531393ddfbafcdd89c09f1798772f6f087ba8 (diff) | |
parent | 8eab6cc53814341b2a076f7dd5ead1b21918bcdb (diff) |
Merge branch 'pks-tx-disambiguate-errors' into 'master'
transactions: Improve error handling
Closes #3401
See merge request gitlab-org/gitaly!3067
-rw-r--r-- | internal/gitaly/service/hook/reference_transaction.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/service/transaction/server.go | 2 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 36 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 40 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 46 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 20 |
7 files changed, 80 insertions, 68 deletions
diff --git a/internal/gitaly/service/hook/reference_transaction.go b/internal/gitaly/service/hook/reference_transaction.go index 92122ba23..efbe0450a 100644 --- a/internal/gitaly/service/hook/reference_transaction.go +++ b/internal/gitaly/service/hook/reference_transaction.go @@ -50,7 +50,7 @@ func (s *server) ReferenceTransactionHook(stream gitalypb.HookService_ReferenceT request.GetEnvironmentVariables(), stdin, ); err != nil { - return helper.ErrInternalf("error voting on transaction: %v", err) + return helper.ErrInternalf("reference-transaction hook: %v", err) } if err := stream.Send(&gitalypb.ReferenceTransactionHookResponse{ diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e00b70660..574c433b3 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -208,7 +208,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { if node.shouldSucceed { assert.NoError(t, err) } else { - assert.True(t, errors.Is(err, transactions.ErrTransactionVoteFailed)) + assert.True(t, errors.Is(err, transactions.ErrTransactionFailed)) } }() } diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go index 0a3edaea0..ceabc2282 100644 --- a/internal/praefect/service/transaction/server.go +++ b/internal/praefect/service/transaction/server.go @@ -35,7 +35,7 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti return &gitalypb.VoteTransactionResponse{ State: gitalypb.VoteTransactionResponse_STOP, }, nil - case errors.Is(err, transactions.ErrTransactionVoteFailed): + case errors.Is(err, transactions.ErrTransactionFailed): return &gitalypb.VoteTransactionResponse{ State: gitalypb.VoteTransactionResponse_ABORT, }, nil diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 5023a5aa8..35bdececf 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -446,10 +446,10 @@ func TestTransactionReachesQuorum(t *testing.T) { func TestTransactionWithMultipleVotes(t *testing.T) { type multiVoter struct { - voteCount uint - votes []string - voteSucceeds []bool - shouldSucceed bool + voteCount uint + votes []string + voteSucceeds []bool + expectedResult transactions.VoteResult } tc := []struct { @@ -460,33 +460,33 @@ func TestTransactionWithMultipleVotes(t *testing.T) { { desc: "quorum is reached with multiple votes", voters: []multiVoter{ - {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, - {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, expectedResult: transactions.VoteCommitted}, + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, expectedResult: transactions.VoteCommitted}, }, threshold: 2, }, { desc: "quorum is not reached with disagreeing votes", voters: []multiVoter{ - {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, - {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteFailed}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteFailed}, }, threshold: 2, }, { desc: "quorum is reached with unweighted disagreeing voter", voters: []multiVoter{ - {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, - {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted}, + {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteCanceled}, }, threshold: 1, }, { desc: "quorum is reached with outweighed disagreeing voter", voters: []multiVoter{ - {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, - {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, - {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted}, + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteCanceled}, }, threshold: 2, }, @@ -546,11 +546,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { results, err := transaction.State() require.NoError(t, err) for i, voter := range tc.voters { - if voter.shouldSucceed { - require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)]) - } else { - require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)]) - } + require.Equal(t, voter.expectedResult, results[fmt.Sprintf("node-%d", i)]) } }) } @@ -682,8 +678,10 @@ func TestTransactionCancellation(t *testing.T) { for i, v := range tc.voters { if v.shouldSucceed { require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") + } else if v.showsUp { + require.Equal(t, transactions.VoteFailed, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") } else { - require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") + require.Equal(t, transactions.VoteCanceled, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") } } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 7c3572887..f27513fa6 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -199,7 +199,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n mgr.lock.Unlock() if !ok { - return ErrNotFound + return fmt.Errorf("%w: %d", ErrNotFound, transactionID) } if err := transaction.vote(ctx, node, hash); err != nil { @@ -227,26 +227,30 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n }).Debug("VoteTransaction") if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil { + fields := logrus.Fields{ + "transaction_id": transactionID, + "node": node, + "hash": hex.EncodeToString(hash), + } + var counterLabel string + if errors.Is(err, ErrTransactionStopped) { - mgr.counterMetric.WithLabelValues("stopped").Inc() - } else if errors.Is(err, ErrTransactionVoteFailed) { - mgr.counterMetric.WithLabelValues("aborted").Inc() - - mgr.log(ctx).WithFields(logrus.Fields{ - "transaction_id": transactionID, - "node": node, - "hash": hex.EncodeToString(hash), - }).WithError(err).Error("VoteTransaction: did not reach quorum") + counterLabel = "stopped" + // Stopped transactions indicate a graceful + // termination, so we should not log an error here. + } else if errors.Is(err, ErrTransactionFailed) { + counterLabel = "failed" + mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: did not reach quorum") + } else if errors.Is(err, ErrTransactionCanceled) { + counterLabel = "canceled" + mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: transaction was canceled") } else { - mgr.counterMetric.WithLabelValues("invalid").Inc() - - mgr.log(ctx).WithFields(logrus.Fields{ - "transaction_id": transactionID, - "node": node, - "hash": hex.EncodeToString(hash), - }).WithError(err).Error("VoteTransaction: vote failed") + counterLabel = "invalid" + mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: failure") } + mgr.counterMetric.WithLabelValues(counterLabel).Inc() + return err } @@ -268,7 +272,7 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e mgr.lock.Unlock() if !ok { - return ErrNotFound + return fmt.Errorf("%w: %d", ErrNotFound, transactionID) } if err := transaction.stop(); err != nil { diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 665001f70..355ac99fe 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -3,19 +3,11 @@ package transactions import ( "context" "crypto/sha1" - "errors" + "encoding/hex" "fmt" "sync" ) -var ( - // ErrTransactionVoteFailed indicates the transaction didn't reach quorum. - ErrTransactionVoteFailed = errors.New("transaction did not reach quorum") - // ErrTransactionCanceled indicates the transaction was canceled before - // reaching quorum. - ErrTransactionCanceled = errors.New("transaction was canceled") -) - // VoteResult represents the outcome of a transaction for a single voter. type VoteResult int @@ -25,8 +17,11 @@ const ( VoteUndecided VoteResult = iota // VoteCommitted means that the voter committed his vote. VoteCommitted - // VoteAborted means that the voter aborted his vote. - VoteAborted + // VoteFailed means that the voter has failed the vote because a + // majority of nodes has elected a different result. + VoteFailed + // VoteCanceled means that the transaction was cancelled. + VoteCanceled // VoteStopped means that the transaction was gracefully stopped. VoteStopped ) @@ -48,6 +43,11 @@ func (v vote) isEmpty() bool { return v == vote{} } +// String returns the hexadecimal string representation of the vote. +func (v vote) String() string { + return hex.EncodeToString(v[:]) +} + // subtransaction is a single session where voters are voting for a certain outcome. type subtransaction struct { doneCh chan interface{} @@ -87,7 +87,7 @@ func (t *subtransaction) cancel() { // to mark it as failed so it won't get the idea of committing // the transaction at a later point anymore. if voter.result == VoteUndecided { - voter.result = VoteAborted + voter.result = VoteCanceled } } @@ -100,8 +100,8 @@ func (t *subtransaction) stop() error { for _, voter := range t.votersByNode { switch voter.result { - case VoteAborted: - // If the vote was aborted already, we cannot stop it. + case VoteCanceled: + // If the vote was canceled already, we cannot stop it. return ErrTransactionCanceled case VoteStopped: // Similar if the vote was stopped already. @@ -109,7 +109,7 @@ func (t *subtransaction) stop() error { case VoteUndecided: // Undecided voters will get stopped, ... voter.result = VoteStopped - case VoteCommitted: + case VoteCommitted, VoteFailed: // ... while decided voters cannot be changed anymore. continue } @@ -212,9 +212,9 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { switch voter.result { case VoteUndecided: // Happy case, no decision was yet made. - case VoteAborted: + case VoteCanceled: // It may happen that the vote was cancelled or stopped just after majority was - // reached. In that case, the node's state is now VoteAborted/VoteStopped, so we + // reached. In that case, the node's state is now VoteCanceled/VoteStopped, so we // have to return an error here. return ErrTransactionCanceled case VoteStopped: @@ -223,11 +223,13 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node) } - // See if our vote crossed the threshold. As there can be only one vote - // exceeding it, we know we're the winner in that case. + // See if our vote crossed the threshold. If not, then we know we + // cannot have won as the transaction is being wrapped up already and + // shouldn't accept any additional votes. if t.voteCounts[voter.vote] < t.threshold { - voter.result = VoteAborted - return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold) + voter.result = VoteFailed + return fmt.Errorf("%w: got %d/%d votes for %v", ErrTransactionFailed, + t.voteCounts[voter.vote], t.threshold, voter.vote) } voter.result = VoteCommitted @@ -240,7 +242,7 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) { voter, ok := t.votersByNode[node] if !ok { - return VoteAborted, fmt.Errorf("invalid node for transaction: %q", node) + return VoteCanceled, fmt.Errorf("invalid node for transaction: %q", node) } return voter.result, nil diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index ae63947b4..770a0ac8f 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -16,9 +16,12 @@ var ( // invalid threshold that may either allow for multiple different // quorums or none at all. ErrInvalidThreshold = errors.New("transaction has invalid threshold") - // ErrSubtransactionFailed indicates a vote was cast on a - // subtransaction which failed already. - ErrSubtransactionFailed = errors.New("subtransaction has failed") + + // ErrTransactionFailed indicates the transaction didn't reach quorum. + ErrTransactionFailed = errors.New("transaction did not reach quorum") + // ErrTransactionCanceled indicates the transaction was canceled before + // reaching quorum. + ErrTransactionCanceled = errors.New("transaction has been canceled") // ErrTransactionStopped indicates the transaction was gracefully stopped. ErrTransactionStopped = errors.New("transaction has been stopped") ) @@ -139,7 +142,7 @@ func (t *Transaction) State() (map[string]VoteResult, error) { case transactionOpen: results[voter.Name] = VoteUndecided case transactionCanceled: - results[voter.Name] = VoteAborted + results[voter.Name] = VoteCanceled case transactionStopped: results[voter.Name] = VoteStopped default: @@ -199,11 +202,16 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e // If we have committed this subtransaction, we're good // to go. continue - case VoteAborted: + case VoteFailed: + // If a vote was cast on a subtransaction which failed + // to reach majority, then we cannot proceed with any + // subsequent votes anymore. + return nil, ErrTransactionFailed + case VoteCanceled: // If the subtransaction was aborted, then we need to // fail as we cannot proceed if the path leading to the // end result has intermittent failures. - return nil, ErrSubtransactionFailed + return nil, ErrTransactionCanceled case VoteStopped: // If the transaction was stopped, then we need to fail // with a graceful error. |