Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2021-02-02 07:58:54 +0300
committerPaul Okstad <pokstad@gitlab.com>2021-02-02 07:58:54 +0300
commit9c2da9436f6a41a244a30deef6f48798f877e909 (patch)
treebeae082286b2b2da5b5ad3bcc29e8599057ba855
parent827531393ddfbafcdd89c09f1798772f6f087ba8 (diff)
parent8eab6cc53814341b2a076f7dd5ead1b21918bcdb (diff)
Merge branch 'pks-tx-disambiguate-errors' into 'master'
transactions: Improve error handling Closes #3401 See merge request gitlab-org/gitaly!3067
-rw-r--r--internal/gitaly/service/hook/reference_transaction.go2
-rw-r--r--internal/praefect/coordinator_pg_test.go2
-rw-r--r--internal/praefect/service/transaction/server.go2
-rw-r--r--internal/praefect/transaction_test.go36
-rw-r--r--internal/praefect/transactions/manager.go40
-rw-r--r--internal/praefect/transactions/subtransaction.go46
-rw-r--r--internal/praefect/transactions/transaction.go20
7 files changed, 80 insertions, 68 deletions
diff --git a/internal/gitaly/service/hook/reference_transaction.go b/internal/gitaly/service/hook/reference_transaction.go
index 92122ba23..efbe0450a 100644
--- a/internal/gitaly/service/hook/reference_transaction.go
+++ b/internal/gitaly/service/hook/reference_transaction.go
@@ -50,7 +50,7 @@ func (s *server) ReferenceTransactionHook(stream gitalypb.HookService_ReferenceT
request.GetEnvironmentVariables(),
stdin,
); err != nil {
- return helper.ErrInternalf("error voting on transaction: %v", err)
+ return helper.ErrInternalf("reference-transaction hook: %v", err)
}
if err := stream.Send(&gitalypb.ReferenceTransactionHookResponse{
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index e00b70660..574c433b3 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -208,7 +208,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
if node.shouldSucceed {
assert.NoError(t, err)
} else {
- assert.True(t, errors.Is(err, transactions.ErrTransactionVoteFailed))
+ assert.True(t, errors.Is(err, transactions.ErrTransactionFailed))
}
}()
}
diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go
index 0a3edaea0..ceabc2282 100644
--- a/internal/praefect/service/transaction/server.go
+++ b/internal/praefect/service/transaction/server.go
@@ -35,7 +35,7 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti
return &gitalypb.VoteTransactionResponse{
State: gitalypb.VoteTransactionResponse_STOP,
}, nil
- case errors.Is(err, transactions.ErrTransactionVoteFailed):
+ case errors.Is(err, transactions.ErrTransactionFailed):
return &gitalypb.VoteTransactionResponse{
State: gitalypb.VoteTransactionResponse_ABORT,
}, nil
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 5023a5aa8..35bdececf 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -446,10 +446,10 @@ func TestTransactionReachesQuorum(t *testing.T) {
func TestTransactionWithMultipleVotes(t *testing.T) {
type multiVoter struct {
- voteCount uint
- votes []string
- voteSucceeds []bool
- shouldSucceed bool
+ voteCount uint
+ votes []string
+ voteSucceeds []bool
+ expectedResult transactions.VoteResult
}
tc := []struct {
@@ -460,33 +460,33 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
{
desc: "quorum is reached with multiple votes",
voters: []multiVoter{
- {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
- {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, expectedResult: transactions.VoteCommitted},
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, expectedResult: transactions.VoteCommitted},
},
threshold: 2,
},
{
desc: "quorum is not reached with disagreeing votes",
voters: []multiVoter{
- {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
- {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteFailed},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteFailed},
},
threshold: 2,
},
{
desc: "quorum is reached with unweighted disagreeing voter",
voters: []multiVoter{
- {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
- {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted},
+ {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteCanceled},
},
threshold: 1,
},
{
desc: "quorum is reached with outweighed disagreeing voter",
voters: []multiVoter{
- {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
- {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
- {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted},
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, expectedResult: transactions.VoteCommitted},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, expectedResult: transactions.VoteCanceled},
},
threshold: 2,
},
@@ -546,11 +546,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
results, err := transaction.State()
require.NoError(t, err)
for i, voter := range tc.voters {
- if voter.shouldSucceed {
- require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)])
- } else {
- require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)])
- }
+ require.Equal(t, voter.expectedResult, results[fmt.Sprintf("node-%d", i)])
}
})
}
@@ -682,8 +678,10 @@ func TestTransactionCancellation(t *testing.T) {
for i, v := range tc.voters {
if v.shouldSucceed {
require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
+ } else if v.showsUp {
+ require.Equal(t, transactions.VoteFailed, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
} else {
- require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
+ require.Equal(t, transactions.VoteCanceled, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
}
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 7c3572887..f27513fa6 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -199,7 +199,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n
mgr.lock.Unlock()
if !ok {
- return ErrNotFound
+ return fmt.Errorf("%w: %d", ErrNotFound, transactionID)
}
if err := transaction.vote(ctx, node, hash); err != nil {
@@ -227,26 +227,30 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
}).Debug("VoteTransaction")
if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil {
+ fields := logrus.Fields{
+ "transaction_id": transactionID,
+ "node": node,
+ "hash": hex.EncodeToString(hash),
+ }
+ var counterLabel string
+
if errors.Is(err, ErrTransactionStopped) {
- mgr.counterMetric.WithLabelValues("stopped").Inc()
- } else if errors.Is(err, ErrTransactionVoteFailed) {
- mgr.counterMetric.WithLabelValues("aborted").Inc()
-
- mgr.log(ctx).WithFields(logrus.Fields{
- "transaction_id": transactionID,
- "node": node,
- "hash": hex.EncodeToString(hash),
- }).WithError(err).Error("VoteTransaction: did not reach quorum")
+ counterLabel = "stopped"
+ // Stopped transactions indicate a graceful
+ // termination, so we should not log an error here.
+ } else if errors.Is(err, ErrTransactionFailed) {
+ counterLabel = "failed"
+ mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: did not reach quorum")
+ } else if errors.Is(err, ErrTransactionCanceled) {
+ counterLabel = "canceled"
+ mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: transaction was canceled")
} else {
- mgr.counterMetric.WithLabelValues("invalid").Inc()
-
- mgr.log(ctx).WithFields(logrus.Fields{
- "transaction_id": transactionID,
- "node": node,
- "hash": hex.EncodeToString(hash),
- }).WithError(err).Error("VoteTransaction: vote failed")
+ counterLabel = "invalid"
+ mgr.log(ctx).WithFields(fields).WithError(err).Error("VoteTransaction: failure")
}
+ mgr.counterMetric.WithLabelValues(counterLabel).Inc()
+
return err
}
@@ -268,7 +272,7 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e
mgr.lock.Unlock()
if !ok {
- return ErrNotFound
+ return fmt.Errorf("%w: %d", ErrNotFound, transactionID)
}
if err := transaction.stop(); err != nil {
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 665001f70..355ac99fe 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -3,19 +3,11 @@ package transactions
import (
"context"
"crypto/sha1"
- "errors"
+ "encoding/hex"
"fmt"
"sync"
)
-var (
- // ErrTransactionVoteFailed indicates the transaction didn't reach quorum.
- ErrTransactionVoteFailed = errors.New("transaction did not reach quorum")
- // ErrTransactionCanceled indicates the transaction was canceled before
- // reaching quorum.
- ErrTransactionCanceled = errors.New("transaction was canceled")
-)
-
// VoteResult represents the outcome of a transaction for a single voter.
type VoteResult int
@@ -25,8 +17,11 @@ const (
VoteUndecided VoteResult = iota
// VoteCommitted means that the voter committed his vote.
VoteCommitted
- // VoteAborted means that the voter aborted his vote.
- VoteAborted
+ // VoteFailed means that the voter has failed the vote because a
+ // majority of nodes has elected a different result.
+ VoteFailed
+ // VoteCanceled means that the transaction was cancelled.
+ VoteCanceled
// VoteStopped means that the transaction was gracefully stopped.
VoteStopped
)
@@ -48,6 +43,11 @@ func (v vote) isEmpty() bool {
return v == vote{}
}
+// String returns the hexadecimal string representation of the vote.
+func (v vote) String() string {
+ return hex.EncodeToString(v[:])
+}
+
// subtransaction is a single session where voters are voting for a certain outcome.
type subtransaction struct {
doneCh chan interface{}
@@ -87,7 +87,7 @@ func (t *subtransaction) cancel() {
// to mark it as failed so it won't get the idea of committing
// the transaction at a later point anymore.
if voter.result == VoteUndecided {
- voter.result = VoteAborted
+ voter.result = VoteCanceled
}
}
@@ -100,8 +100,8 @@ func (t *subtransaction) stop() error {
for _, voter := range t.votersByNode {
switch voter.result {
- case VoteAborted:
- // If the vote was aborted already, we cannot stop it.
+ case VoteCanceled:
+ // If the vote was canceled already, we cannot stop it.
return ErrTransactionCanceled
case VoteStopped:
// Similar if the vote was stopped already.
@@ -109,7 +109,7 @@ func (t *subtransaction) stop() error {
case VoteUndecided:
// Undecided voters will get stopped, ...
voter.result = VoteStopped
- case VoteCommitted:
+ case VoteCommitted, VoteFailed:
// ... while decided voters cannot be changed anymore.
continue
}
@@ -212,9 +212,9 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
switch voter.result {
case VoteUndecided:
// Happy case, no decision was yet made.
- case VoteAborted:
+ case VoteCanceled:
// It may happen that the vote was cancelled or stopped just after majority was
- // reached. In that case, the node's state is now VoteAborted/VoteStopped, so we
+ // reached. In that case, the node's state is now VoteCanceled/VoteStopped, so we
// have to return an error here.
return ErrTransactionCanceled
case VoteStopped:
@@ -223,11 +223,13 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
}
- // See if our vote crossed the threshold. As there can be only one vote
- // exceeding it, we know we're the winner in that case.
+ // See if our vote crossed the threshold. If not, then we know we
+ // cannot have won as the transaction is being wrapped up already and
+ // shouldn't accept any additional votes.
if t.voteCounts[voter.vote] < t.threshold {
- voter.result = VoteAborted
- return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold)
+ voter.result = VoteFailed
+ return fmt.Errorf("%w: got %d/%d votes for %v", ErrTransactionFailed,
+ t.voteCounts[voter.vote], t.threshold, voter.vote)
}
voter.result = VoteCommitted
@@ -240,7 +242,7 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) {
voter, ok := t.votersByNode[node]
if !ok {
- return VoteAborted, fmt.Errorf("invalid node for transaction: %q", node)
+ return VoteCanceled, fmt.Errorf("invalid node for transaction: %q", node)
}
return voter.result, nil
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index ae63947b4..770a0ac8f 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -16,9 +16,12 @@ var (
// invalid threshold that may either allow for multiple different
// quorums or none at all.
ErrInvalidThreshold = errors.New("transaction has invalid threshold")
- // ErrSubtransactionFailed indicates a vote was cast on a
- // subtransaction which failed already.
- ErrSubtransactionFailed = errors.New("subtransaction has failed")
+
+ // ErrTransactionFailed indicates the transaction didn't reach quorum.
+ ErrTransactionFailed = errors.New("transaction did not reach quorum")
+ // ErrTransactionCanceled indicates the transaction was canceled before
+ // reaching quorum.
+ ErrTransactionCanceled = errors.New("transaction has been canceled")
// ErrTransactionStopped indicates the transaction was gracefully stopped.
ErrTransactionStopped = errors.New("transaction has been stopped")
)
@@ -139,7 +142,7 @@ func (t *Transaction) State() (map[string]VoteResult, error) {
case transactionOpen:
results[voter.Name] = VoteUndecided
case transactionCanceled:
- results[voter.Name] = VoteAborted
+ results[voter.Name] = VoteCanceled
case transactionStopped:
results[voter.Name] = VoteStopped
default:
@@ -199,11 +202,16 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
// If we have committed this subtransaction, we're good
// to go.
continue
- case VoteAborted:
+ case VoteFailed:
+ // If a vote was cast on a subtransaction which failed
+ // to reach majority, then we cannot proceed with any
+ // subsequent votes anymore.
+ return nil, ErrTransactionFailed
+ case VoteCanceled:
// If the subtransaction was aborted, then we need to
// fail as we cannot proceed if the path leading to the
// end result has intermittent failures.
- return nil, ErrSubtransactionFailed
+ return nil, ErrTransactionCanceled
case VoteStopped:
// If the transaction was stopped, then we need to fail
// with a graceful error.