diff options
author | James Fargher <proglottis@gmail.com> | 2020-07-21 05:33:51 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2020-07-21 05:33:51 +0300 |
commit | e80120e6b0dcde7f47b04002736fb8785494748e (patch) | |
tree | 9cf75cc269584389dece393375ba939cd12e788c | |
parent | 5e455d1ed42ea46849582fdcd87792b6244053cf (diff) | |
parent | 974cf6a450f8f36bd6b7f73c534a952196b879d2 (diff) |
Merge branch 'pks-transactions-expose-transaction' into 'master'
Expose transactions to callers directly
See merge request gitlab-org/gitaly!2396
-rw-r--r-- | internal/praefect/coordinator.go | 11 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 34 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 30 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 16 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 39 |
5 files changed, 81 insertions, 49 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 22aa48702..df00fe4a8 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -231,16 +231,17 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall threshold += 1 } - transactionID, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold) + transaction, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold) if err != nil { return nil, fmt.Errorf("registering transactions: %w", err) } finalizers = append(finalizers, func() error { - successByNode, err := transactionCleanup() - if err != nil { + if err := transactionCleanup(); err != nil { return err } + successByNode := transaction.State() + // If the primary node failed the transaction, then // there's no sense in trying to replicate from primary // to secondaries. @@ -269,7 +270,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)() }) - injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), shard.Primary.GetStorage(), true) if err != nil { return nil, err } @@ -282,7 +283,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } - injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, secondary.GetStorage(), false) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.GetStorage(), false) if err != nil { return nil, err } diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 12223c272..9f927cbd1 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -78,17 +78,18 @@ func TestTransactionSucceeds(t *testing.T) { client := gitalypb.NewRefTransactionClient(cc) - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ {Name: "node1", Votes: 1}, }, 1) require.NoError(t, err) - require.NotZero(t, transactionID) + require.NotNil(t, transaction) + require.NotZero(t, transaction.ID()) defer cancelTransaction() hash := sha1.Sum([]byte{}) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: "node1", ReferenceUpdatesHash: hash[:], }) @@ -184,7 +185,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) { threshold += 1 } - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold) + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold) require.NoError(t, err) defer cancelTransaction() @@ -196,7 +197,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) { defer wg.Done() response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: voters[idx].Name, ReferenceUpdatesHash: tc.hashes[idx][:], }) @@ -218,7 +219,7 @@ func TestTransactionWithContextCancellation(t *testing.T) { ctx, cancel := testhelper.Context() - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ {Name: "voter", Votes: 1}, {Name: "absent", Votes: 1}, }, 2) @@ -233,7 +234,7 @@ func TestTransactionWithContextCancellation(t *testing.T) { go func() { defer wg.Done() _, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: "voter", ReferenceUpdatesHash: hash[:], }) @@ -407,7 +408,7 @@ func TestTransactionReachesQuorum(t *testing.T) { }) } - transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) defer cancel() @@ -425,7 +426,7 @@ func TestTransactionReachesQuorum(t *testing.T) { hash := sha1.Sum([]byte(v.vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) @@ -511,7 +512,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { }) } - transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) var wg sync.WaitGroup @@ -525,7 +526,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { hash := sha1.Sum([]byte(vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) @@ -542,7 +543,8 @@ func TestTransactionWithMultipleVotes(t *testing.T) { wg.Wait() - results, _ := cancel() + require.NoError(t, cancel()) + results := transaction.State() for i, voter := range tc.voters { require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)]) } @@ -639,7 +641,7 @@ func TestTransactionCancellation(t *testing.T) { }) } - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) var wg sync.WaitGroup @@ -656,7 +658,7 @@ func TestTransactionCancellation(t *testing.T) { hash := sha1.Sum([]byte(v.vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) @@ -671,9 +673,9 @@ func TestTransactionCancellation(t *testing.T) { } wg.Wait() - results, err := cancelTransaction() - require.NoError(t, err) + require.NoError(t, cancelTransaction()) + results := transaction.State() for i, v := range tc.voters { require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state") } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 0084eee1f..1afc03588 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -24,7 +24,7 @@ var ErrNotFound = errors.New("transaction not found") type Manager struct { txIDGenerator TransactionIDGenerator lock sync.Mutex - transactions map[uint64]*transaction + transactions map[uint64]*Transaction counterMetric *prometheus.CounterVec delayMetric metrics.HistogramVec subtransactionsMetric metrics.Histogram @@ -92,7 +92,7 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { func NewManager(opts ...ManagerOpt) *Manager { mgr := &Manager{ txIDGenerator: newTransactionIDGenerator(), - transactions: make(map[uint64]*transaction), + transactions: make(map[uint64]*Transaction), counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}), delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}), subtransactionsMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), @@ -111,15 +111,14 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // CancelFunc is the transaction cancellation function returned by // `RegisterTransaction`. Calling it will cause the transaction to be removed -// from the transaction manager. It returns the outcome for each node: `true` -// if the node committed the transaction, `false` if it aborted. -type CancelFunc func() (map[string]bool, error) +// from the transaction manager. +type CancelFunc func() error // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < // threshold <= weight(voters) to avoid indecidable votes. -func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (uint64, CancelFunc, error) { +func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) { mgr.lock.Lock() defer mgr.lock.Unlock() @@ -129,13 +128,13 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr // nodes still have in-flight transactions. transactionID := mgr.txIDGenerator.ID() - transaction, err := newTransaction(voters, threshold) + transaction, err := newTransaction(transactionID, voters, threshold) if err != nil { - return 0, nil, err + return nil, nil, err } if _, ok := mgr.transactions[transactionID]; ok { - return 0, nil, errors.New("transaction exists already") + return nil, nil, errors.New("transaction exists already") } mgr.transactions[transactionID] = transaction @@ -146,20 +145,21 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters))) - return transactionID, func() (map[string]bool, error) { - return mgr.cancelTransaction(transactionID, transaction) + return transaction, func() error { + return mgr.cancelTransaction(transaction) }, nil } -func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) { +func (mgr *Manager) cancelTransaction(transaction *Transaction) error { mgr.lock.Lock() defer mgr.lock.Unlock() - delete(mgr.transactions, transactionID) + delete(mgr.transactions, transaction.ID()) - mgr.subtransactionsMetric.Observe(float64(transaction.countSubtransactions())) + transaction.cancel() + mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions())) - return transaction.cancel(), nil + return nil } func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 9950ea53a..58bca0c92 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -74,22 +74,30 @@ func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error) }, nil } -func (t *subtransaction) cancel() map[string]bool { +func (t *subtransaction) cancel() { t.lock.Lock() defer t.lock.Unlock() - results := make(map[string]bool, len(t.votersByNode)) - for node, voter := range t.votersByNode { + for _, voter := range t.votersByNode { // If a voter didn't yet show up or is still undecided, we need // to mark it as failed so it won't get the idea of committing // the transaction at a later point anymore. if voter.result == voteUndecided { voter.result = voteAborted } - results[node] = voter.result == voteCommitted } close(t.cancelCh) +} + +func (t *subtransaction) state() map[string]bool { + t.lock.Lock() + defer t.lock.Unlock() + + results := make(map[string]bool, len(t.votersByNode)) + for node, voter := range t.votersByNode { + results[node] = voter.result == voteCommitted + } return results } diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index feb215c2c..388625a83 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -34,11 +34,12 @@ type Voter struct { result voteResult } -// transaction is a session where a set of voters votes on one or more +// Transaction is a session where a set of voters votes on one or more // subtransactions. Subtransactions are a sequence of sessions, where each node // needs to go through the same sequence and agree on the same thing in the end // in order to have the complete transaction succeed. -type transaction struct { +type Transaction struct { + id uint64 threshold uint voters []Voter @@ -46,7 +47,7 @@ type transaction struct { subtransactions []*subtransaction } -func newTransaction(voters []Voter, threshold uint) (*transaction, error) { +func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, error) { if len(voters) == 0 { return nil, ErrMissingNodes } @@ -74,13 +75,31 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) { return nil, ErrInvalidThreshold } - return &transaction{ + return &Transaction{ + id: id, threshold: threshold, voters: voters, }, nil } -func (t *transaction) cancel() map[string]bool { +func (t *Transaction) cancel() { + t.lock.Lock() + defer t.lock.Unlock() + + for _, subtransaction := range t.subtransactions { + subtransaction.cancel() + } +} + +// ID returns the identifier used to uniquely identify a transaction. +func (t *Transaction) ID() uint64 { + return t.id +} + +// State returns the voting state mapped by voters. A voting state of `true` +// means all subtransactions were successful, a voting state of `false` means +// either no subtransactions were created or any of the subtransactions failed. +func (t *Transaction) State() map[string]bool { t.lock.Lock() defer t.lock.Unlock() @@ -91,7 +110,7 @@ func (t *transaction) cancel() map[string]bool { // node as well. Otherwise, if all subtransactions for the node // succeeded, the transaction did as well. for _, subtransaction := range t.subtransactions { - for voter, result := range subtransaction.cancel() { + for voter, result := range subtransaction.state() { // If there already is an entry indicating failure, keep it. if didSucceed, ok := results[voter]; ok && !didSucceed { continue @@ -103,7 +122,9 @@ func (t *transaction) cancel() map[string]bool { return results } -func (t *transaction) countSubtransactions() int { +// CountSubtransactions counts the number of subtransactions created as part of +// the transaction. +func (t *Transaction) CountSubtransactions() int { return len(t.subtransactions) } @@ -111,7 +132,7 @@ func (t *transaction) countSubtransactions() int { // node hasn't yet voted on or creates a new one if the node has succeeded on // all subtransactions. In case the node has failed on any of the // subtransactions, an error will be returned. -func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { +func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { t.lock.Lock() defer t.lock.Unlock() @@ -149,7 +170,7 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e return subtransaction, nil } -func (t *transaction) vote(ctx context.Context, node string, hash []byte) error { +func (t *Transaction) vote(ctx context.Context, node string, hash []byte) error { subtransaction, err := t.getOrCreateSubtransaction(node) if err != nil { return err |