Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-20 09:00:28 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-20 09:56:19 +0300
commitf8f38f43a8ad0f5825161b1da5e13a2eb6e75377 (patch)
tree7c66e60dacf8f827f3ea54c7d4abcfc030e3fa03
parenta965a24b38c521b81378e15a25ec9cbde9cbb62f (diff)
transactions: Let `RegisterTransaction` return the transaction directly
Right now, `RegisterTransaction()` will return the registered transaction's identifier instead of the transaction itself. This makes it kind of unwiedly to retrieve further information about the the transaction. Now that the `Transaction` type became public, let's instead return the transaction itself instead of a proxy value.
-rw-r--r--internal/praefect/coordinator.go6
-rw-r--r--internal/praefect/transaction_test.go27
-rw-r--r--internal/praefect/transactions/manager.go14
3 files changed, 24 insertions, 23 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 22aa48702..4e63245d0 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -231,7 +231,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
threshold += 1
}
- transactionID, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold)
+ transaction, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold)
if err != nil {
return nil, fmt.Errorf("registering transactions: %w", err)
}
@@ -269,7 +269,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)()
})
- injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), shard.Primary.GetStorage(), true)
if err != nil {
return nil, err
}
@@ -282,7 +282,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, err
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, secondary.GetStorage(), false)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.GetStorage(), false)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 12223c272..eb22067cf 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -78,17 +78,18 @@ func TestTransactionSucceeds(t *testing.T) {
client := gitalypb.NewRefTransactionClient(cc)
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
{Name: "node1", Votes: 1},
}, 1)
require.NoError(t, err)
- require.NotZero(t, transactionID)
+ require.NotNil(t, transaction)
+ require.NotZero(t, transaction.ID())
defer cancelTransaction()
hash := sha1.Sum([]byte{})
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: "node1",
ReferenceUpdatesHash: hash[:],
})
@@ -184,7 +185,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) {
threshold += 1
}
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold)
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold)
require.NoError(t, err)
defer cancelTransaction()
@@ -196,7 +197,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) {
defer wg.Done()
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: voters[idx].Name,
ReferenceUpdatesHash: tc.hashes[idx][:],
})
@@ -218,7 +219,7 @@ func TestTransactionWithContextCancellation(t *testing.T) {
ctx, cancel := testhelper.Context()
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
{Name: "voter", Votes: 1},
{Name: "absent", Votes: 1},
}, 2)
@@ -233,7 +234,7 @@ func TestTransactionWithContextCancellation(t *testing.T) {
go func() {
defer wg.Done()
_, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: "voter",
ReferenceUpdatesHash: hash[:],
})
@@ -407,7 +408,7 @@ func TestTransactionReachesQuorum(t *testing.T) {
})
}
- transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
defer cancel()
@@ -425,7 +426,7 @@ func TestTransactionReachesQuorum(t *testing.T) {
hash := sha1.Sum([]byte(v.vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
@@ -511,7 +512,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
})
}
- transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -525,7 +526,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
hash := sha1.Sum([]byte(vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
@@ -639,7 +640,7 @@ func TestTransactionCancellation(t *testing.T) {
})
}
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -656,7 +657,7 @@ func TestTransactionCancellation(t *testing.T) {
hash := sha1.Sum([]byte(v.vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index e33ec1c06..32af609e1 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -119,7 +119,7 @@ type CancelFunc func() (map[string]bool, error)
// taking part in the transaction. `threshold` is the threshold at which an
// election will succeed. It needs to be in the range `weight(voters)/2 <
// threshold <= weight(voters) to avoid indecidable votes.
-func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (uint64, CancelFunc, error) {
+func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
@@ -131,11 +131,11 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
transaction, err := newTransaction(transactionID, voters, threshold)
if err != nil {
- return 0, nil, err
+ return nil, nil, err
}
if _, ok := mgr.transactions[transactionID]; ok {
- return 0, nil, errors.New("transaction exists already")
+ return nil, nil, errors.New("transaction exists already")
}
mgr.transactions[transactionID] = transaction
@@ -146,16 +146,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters)))
- return transactionID, func() (map[string]bool, error) {
- return mgr.cancelTransaction(transactionID, transaction)
+ return transaction, func() (map[string]bool, error) {
+ return mgr.cancelTransaction(transaction)
}, nil
}
-func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *Transaction) (map[string]bool, error) {
+func (mgr *Manager) cancelTransaction(transaction *Transaction) (map[string]bool, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- delete(mgr.transactions, transactionID)
+ delete(mgr.transactions, transaction.ID())
transaction.cancel()
mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions()))