Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-09-25 13:24:02 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-12 12:02:54 +0300
commit0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (patch)
tree77734a717e58075bc30d2c3e306748592bed8e95
parent5ad541682a84f8fb9561f28c37f29f3f5a764c35 (diff)
transactions: Allow graceful stop of transactions
It is currently only possible to stop transactions by cancelling them, which will cause all ongoing and future votes on this transaction to raise errors. But in some cases we need to stop transactions gracefully without raising errors, which is nothing that can be achieved right now. This commit implements the first piece of the puzzle to allow for this use case, which is stopping of transactions. Semantics are mostly the same as for transaction cancellation, but with proper error handling and state tracking such that multiple calls to stop will not panic.
-rw-r--r--internal/praefect/transactions/manager.go44
-rw-r--r--internal/praefect/transactions/subtransaction.go45
-rw-r--r--internal/praefect/transactions/transaction.go23
3 files changed, 103 insertions, 9 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index b663c4827..0cf536af6 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -223,16 +223,24 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
}).Debug("VoteTransaction")
if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil {
- mgr.log(ctx).WithFields(logrus.Fields{
- "transaction_id": transactionID,
- "node": node,
- "hash": hex.EncodeToString(hash),
- }).WithError(err).Error("VoteTransaction: vote failed")
-
- if errors.Is(err, ErrTransactionVoteFailed) {
+ if errors.Is(err, ErrTransactionStopped) {
+ mgr.counterMetric.WithLabelValues("stopped").Inc()
+ } else if errors.Is(err, ErrTransactionVoteFailed) {
mgr.counterMetric.WithLabelValues("aborted").Inc()
+
+ mgr.log(ctx).WithFields(logrus.Fields{
+ "transaction_id": transactionID,
+ "node": node,
+ "hash": hex.EncodeToString(hash),
+ }).WithError(err).Error("VoteTransaction: did not reach quorum")
} else {
mgr.counterMetric.WithLabelValues("invalid").Inc()
+
+ mgr.log(ctx).WithFields(logrus.Fields{
+ "transaction_id": transactionID,
+ "node": node,
+ "hash": hex.EncodeToString(hash),
+ }).WithError(err).Error("VoteTransaction: vote failed")
}
return err
@@ -248,3 +256,25 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
return nil
}
+
+// StopTransaction will gracefully stop a transaction.
+func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) error {
+ mgr.lock.Lock()
+ transaction, ok := mgr.transactions[transactionID]
+ mgr.lock.Unlock()
+
+ if !ok {
+ return ErrNotFound
+ }
+
+ if err := transaction.stop(); err != nil {
+ return err
+ }
+
+ mgr.log(ctx).WithFields(logrus.Fields{
+ "transaction_id": transactionID,
+ }).Debug("VoteTransaction: transaction stopped")
+ mgr.counterMetric.WithLabelValues("stopped").Inc()
+
+ return nil
+}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 58bca0c92..33c11f5da 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -27,6 +27,8 @@ const (
voteCommitted
// voteAborted means that the voter aborted his vote.
voteAborted
+ // voteStopped means that the transaction was gracefully stopped.
+ voteStopped
)
type vote [sha1.Size]byte
@@ -50,6 +52,7 @@ func (v vote) isEmpty() bool {
type subtransaction struct {
doneCh chan interface{}
cancelCh chan interface{}
+ stopCh chan interface{}
threshold uint
@@ -68,6 +71,7 @@ func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error)
return &subtransaction{
doneCh: make(chan interface{}),
cancelCh: make(chan interface{}),
+ stopCh: make(chan interface{}),
threshold: threshold,
votersByNode: votersByNode,
voteCounts: make(map[vote]uint, len(voters)),
@@ -90,6 +94,31 @@ func (t *subtransaction) cancel() {
close(t.cancelCh)
}
+func (t *subtransaction) stop() error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ for _, voter := range t.votersByNode {
+ switch voter.result {
+ case voteAborted:
+ // If the vote was aborted already, we cannot stop it.
+ return ErrTransactionCanceled
+ case voteStopped:
+ // Similar if the vote was stopped already.
+ return ErrTransactionStopped
+ case voteUndecided:
+ // Undecided voters will get stopped, ...
+ voter.result = voteStopped
+ case voteCommitted:
+ // ... while decided voters cannot be changed anymore.
+ continue
+ }
+ }
+
+ close(t.stopCh)
+ return nil
+}
+
func (t *subtransaction) state() map[string]bool {
t.lock.Lock()
defer t.lock.Unlock()
@@ -166,6 +195,8 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
return ctx.Err()
case <-t.cancelCh:
return ErrTransactionCanceled
+ case <-t.stopCh:
+ return ErrTransactionStopped
case <-t.doneCh:
break
}
@@ -178,8 +209,18 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
return fmt.Errorf("invalid node for transaction: %q", node)
}
- if voter.result != voteUndecided {
- return fmt.Errorf("voter has already settled on an outcome: %q", node)
+ switch voter.result {
+ case voteUndecided:
+ // Happy case, no decision was yet made.
+ case voteAborted:
+ // It may happen that the vote was cancelled or stopped just after majority was
+ // reached. In that case, the node's state is now voteAborted/voteStopped, so we
+ // have to return an error here.
+ return ErrTransactionCanceled
+ case voteStopped:
+ return ErrTransactionStopped
+ default:
+ return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
}
// See if our vote crossed the threshold. As there can be only one vote
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 37f4b841c..5c1d53a23 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -19,6 +19,8 @@ var (
// ErrSubtransactionFailed indicates a vote was cast on a
// subtransaction which failed already.
ErrSubtransactionFailed = errors.New("subtransaction has failed")
+ // ErrTransactionStopped indicates the transaction was gracefully stopped.
+ ErrTransactionStopped = errors.New("transaction has been stopped")
)
type transactionState int
@@ -26,6 +28,7 @@ type transactionState int
const (
transactionOpen = transactionState(iota)
transactionCanceled
+ transactionStopped
)
// Voter is a participant in a given transaction that may cast a vote.
@@ -102,6 +105,20 @@ func (t *Transaction) cancel() {
t.state = transactionCanceled
}
+func (t *Transaction) stop() error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ for _, subtransaction := range t.subtransactions {
+ if err := subtransaction.stop(); err != nil {
+ return err
+ }
+ }
+ t.state = transactionStopped
+
+ return nil
+}
+
// ID returns the identifier used to uniquely identify a transaction.
func (t *Transaction) ID() uint64 {
return t.id
@@ -161,6 +178,8 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
// expected state, nothing to do
case transactionCanceled:
return nil, ErrTransactionCanceled
+ case transactionStopped:
+ return nil, ErrTransactionStopped
default:
return nil, errors.New("invalid transaction state")
}
@@ -184,6 +203,10 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
// fail as we cannot proceed if the path leading to the
// end result has intermittent failures.
return nil, ErrSubtransactionFailed
+ case voteStopped:
+ // If the transaction was stopped, then we need to fail
+ // with a graceful error.
+ return nil, ErrTransactionStopped
}
}