diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-09-25 13:24:02 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-12 12:02:54 +0300 |
commit | 0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (patch) | |
tree | 77734a717e58075bc30d2c3e306748592bed8e95 | |
parent | 5ad541682a84f8fb9561f28c37f29f3f5a764c35 (diff) |
transactions: Allow graceful stop of transactions
It is currently only possible to stop transactions by cancelling them,
which will cause all ongoing and future votes on this transaction to
raise errors. But in some cases we need to stop transactions gracefully
without raising errors, which is nothing that can be achieved right now.
This commit implements the first piece of the puzzle to allow for this
use case, which is stopping of transactions. Semantics are mostly the
same as for transaction cancellation, but with proper error handling and
state tracking such that multiple calls to stop will not panic.
-rw-r--r-- | internal/praefect/transactions/manager.go | 44 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 45 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 23 |
3 files changed, 103 insertions, 9 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index b663c4827..0cf536af6 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -223,16 +223,24 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n }).Debug("VoteTransaction") if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil { - mgr.log(ctx).WithFields(logrus.Fields{ - "transaction_id": transactionID, - "node": node, - "hash": hex.EncodeToString(hash), - }).WithError(err).Error("VoteTransaction: vote failed") - - if errors.Is(err, ErrTransactionVoteFailed) { + if errors.Is(err, ErrTransactionStopped) { + mgr.counterMetric.WithLabelValues("stopped").Inc() + } else if errors.Is(err, ErrTransactionVoteFailed) { mgr.counterMetric.WithLabelValues("aborted").Inc() + + mgr.log(ctx).WithFields(logrus.Fields{ + "transaction_id": transactionID, + "node": node, + "hash": hex.EncodeToString(hash), + }).WithError(err).Error("VoteTransaction: did not reach quorum") } else { mgr.counterMetric.WithLabelValues("invalid").Inc() + + mgr.log(ctx).WithFields(logrus.Fields{ + "transaction_id": transactionID, + "node": node, + "hash": hex.EncodeToString(hash), + }).WithError(err).Error("VoteTransaction: vote failed") } return err @@ -248,3 +256,25 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n return nil } + +// StopTransaction will gracefully stop a transaction. +func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) error { + mgr.lock.Lock() + transaction, ok := mgr.transactions[transactionID] + mgr.lock.Unlock() + + if !ok { + return ErrNotFound + } + + if err := transaction.stop(); err != nil { + return err + } + + mgr.log(ctx).WithFields(logrus.Fields{ + "transaction_id": transactionID, + }).Debug("VoteTransaction: transaction stopped") + mgr.counterMetric.WithLabelValues("stopped").Inc() + + return nil +} diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 58bca0c92..33c11f5da 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -27,6 +27,8 @@ const ( voteCommitted // voteAborted means that the voter aborted his vote. voteAborted + // voteStopped means that the transaction was gracefully stopped. + voteStopped ) type vote [sha1.Size]byte @@ -50,6 +52,7 @@ func (v vote) isEmpty() bool { type subtransaction struct { doneCh chan interface{} cancelCh chan interface{} + stopCh chan interface{} threshold uint @@ -68,6 +71,7 @@ func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error) return &subtransaction{ doneCh: make(chan interface{}), cancelCh: make(chan interface{}), + stopCh: make(chan interface{}), threshold: threshold, votersByNode: votersByNode, voteCounts: make(map[vote]uint, len(voters)), @@ -90,6 +94,31 @@ func (t *subtransaction) cancel() { close(t.cancelCh) } +func (t *subtransaction) stop() error { + t.lock.Lock() + defer t.lock.Unlock() + + for _, voter := range t.votersByNode { + switch voter.result { + case voteAborted: + // If the vote was aborted already, we cannot stop it. + return ErrTransactionCanceled + case voteStopped: + // Similar if the vote was stopped already. + return ErrTransactionStopped + case voteUndecided: + // Undecided voters will get stopped, ... + voter.result = voteStopped + case voteCommitted: + // ... while decided voters cannot be changed anymore. + continue + } + } + + close(t.stopCh) + return nil +} + func (t *subtransaction) state() map[string]bool { t.lock.Lock() defer t.lock.Unlock() @@ -166,6 +195,8 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { return ctx.Err() case <-t.cancelCh: return ErrTransactionCanceled + case <-t.stopCh: + return ErrTransactionStopped case <-t.doneCh: break } @@ -178,8 +209,18 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { return fmt.Errorf("invalid node for transaction: %q", node) } - if voter.result != voteUndecided { - return fmt.Errorf("voter has already settled on an outcome: %q", node) + switch voter.result { + case voteUndecided: + // Happy case, no decision was yet made. + case voteAborted: + // It may happen that the vote was cancelled or stopped just after majority was + // reached. In that case, the node's state is now voteAborted/voteStopped, so we + // have to return an error here. + return ErrTransactionCanceled + case voteStopped: + return ErrTransactionStopped + default: + return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node) } // See if our vote crossed the threshold. As there can be only one vote diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 37f4b841c..5c1d53a23 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -19,6 +19,8 @@ var ( // ErrSubtransactionFailed indicates a vote was cast on a // subtransaction which failed already. ErrSubtransactionFailed = errors.New("subtransaction has failed") + // ErrTransactionStopped indicates the transaction was gracefully stopped. + ErrTransactionStopped = errors.New("transaction has been stopped") ) type transactionState int @@ -26,6 +28,7 @@ type transactionState int const ( transactionOpen = transactionState(iota) transactionCanceled + transactionStopped ) // Voter is a participant in a given transaction that may cast a vote. @@ -102,6 +105,20 @@ func (t *Transaction) cancel() { t.state = transactionCanceled } +func (t *Transaction) stop() error { + t.lock.Lock() + defer t.lock.Unlock() + + for _, subtransaction := range t.subtransactions { + if err := subtransaction.stop(); err != nil { + return err + } + } + t.state = transactionStopped + + return nil +} + // ID returns the identifier used to uniquely identify a transaction. func (t *Transaction) ID() uint64 { return t.id @@ -161,6 +178,8 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e // expected state, nothing to do case transactionCanceled: return nil, ErrTransactionCanceled + case transactionStopped: + return nil, ErrTransactionStopped default: return nil, errors.New("invalid transaction state") } @@ -184,6 +203,10 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e // fail as we cannot proceed if the path leading to the // end result has intermittent failures. return nil, ErrSubtransactionFailed + case voteStopped: + // If the transaction was stopped, then we need to fail + // with a graceful error. + return nil, ErrTransactionStopped } } |