diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-20 09:00:28 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-20 09:56:19 +0300 |
commit | f8f38f43a8ad0f5825161b1da5e13a2eb6e75377 (patch) | |
tree | 7c66e60dacf8f827f3ea54c7d4abcfc030e3fa03 | |
parent | a965a24b38c521b81378e15a25ec9cbde9cbb62f (diff) |
transactions: Let `RegisterTransaction` return the transaction directly
Right now, `RegisterTransaction()` will return the registered
transaction's identifier instead of the transaction itself. This makes
it kind of unwiedly to retrieve further information about the the
transaction. Now that the `Transaction` type became public, let's
instead return the transaction itself instead of a proxy value.
-rw-r--r-- | internal/praefect/coordinator.go | 6 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 27 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 14 |
3 files changed, 24 insertions, 23 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 22aa48702..4e63245d0 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -231,7 +231,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall threshold += 1 } - transactionID, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold) + transaction, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold) if err != nil { return nil, fmt.Errorf("registering transactions: %w", err) } @@ -269,7 +269,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)() }) - injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), shard.Primary.GetStorage(), true) if err != nil { return nil, err } @@ -282,7 +282,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } - injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, secondary.GetStorage(), false) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.GetStorage(), false) if err != nil { return nil, err } diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 12223c272..eb22067cf 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -78,17 +78,18 @@ func TestTransactionSucceeds(t *testing.T) { client := gitalypb.NewRefTransactionClient(cc) - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ {Name: "node1", Votes: 1}, }, 1) require.NoError(t, err) - require.NotZero(t, transactionID) + require.NotNil(t, transaction) + require.NotZero(t, transaction.ID()) defer cancelTransaction() hash := sha1.Sum([]byte{}) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: "node1", ReferenceUpdatesHash: hash[:], }) @@ -184,7 +185,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) { threshold += 1 } - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold) + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold) require.NoError(t, err) defer cancelTransaction() @@ -196,7 +197,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) { defer wg.Done() response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: voters[idx].Name, ReferenceUpdatesHash: tc.hashes[idx][:], }) @@ -218,7 +219,7 @@ func TestTransactionWithContextCancellation(t *testing.T) { ctx, cancel := testhelper.Context() - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ {Name: "voter", Votes: 1}, {Name: "absent", Votes: 1}, }, 2) @@ -233,7 +234,7 @@ func TestTransactionWithContextCancellation(t *testing.T) { go func() { defer wg.Done() _, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: "voter", ReferenceUpdatesHash: hash[:], }) @@ -407,7 +408,7 @@ func TestTransactionReachesQuorum(t *testing.T) { }) } - transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) defer cancel() @@ -425,7 +426,7 @@ func TestTransactionReachesQuorum(t *testing.T) { hash := sha1.Sum([]byte(v.vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) @@ -511,7 +512,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { }) } - transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) var wg sync.WaitGroup @@ -525,7 +526,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) { hash := sha1.Sum([]byte(vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) @@ -639,7 +640,7 @@ func TestTransactionCancellation(t *testing.T) { }) } - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) require.NoError(t, err) var wg sync.WaitGroup @@ -656,7 +657,7 @@ func TestTransactionCancellation(t *testing.T) { hash := sha1.Sum([]byte(v.vote)) response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, + TransactionId: transaction.ID(), Node: name, ReferenceUpdatesHash: hash[:], }) diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index e33ec1c06..32af609e1 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -119,7 +119,7 @@ type CancelFunc func() (map[string]bool, error) // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < // threshold <= weight(voters) to avoid indecidable votes. -func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (uint64, CancelFunc, error) { +func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) { mgr.lock.Lock() defer mgr.lock.Unlock() @@ -131,11 +131,11 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr transaction, err := newTransaction(transactionID, voters, threshold) if err != nil { - return 0, nil, err + return nil, nil, err } if _, ok := mgr.transactions[transactionID]; ok { - return 0, nil, errors.New("transaction exists already") + return nil, nil, errors.New("transaction exists already") } mgr.transactions[transactionID] = transaction @@ -146,16 +146,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters))) - return transactionID, func() (map[string]bool, error) { - return mgr.cancelTransaction(transactionID, transaction) + return transaction, func() (map[string]bool, error) { + return mgr.cancelTransaction(transaction) }, nil } -func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *Transaction) (map[string]bool, error) { +func (mgr *Manager) cancelTransaction(transaction *Transaction) (map[string]bool, error) { mgr.lock.Lock() defer mgr.lock.Unlock() - delete(mgr.transactions, transactionID) + delete(mgr.transactions, transaction.ID()) transaction.cancel() mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions())) |