Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2020-07-21 05:33:51 +0300
committerJames Fargher <proglottis@gmail.com>2020-07-21 05:33:51 +0300
commite80120e6b0dcde7f47b04002736fb8785494748e (patch)
tree9cf75cc269584389dece393375ba939cd12e788c
parent5e455d1ed42ea46849582fdcd87792b6244053cf (diff)
parent974cf6a450f8f36bd6b7f73c534a952196b879d2 (diff)
Merge branch 'pks-transactions-expose-transaction' into 'master'
Expose transactions to callers directly See merge request gitlab-org/gitaly!2396
-rw-r--r--internal/praefect/coordinator.go11
-rw-r--r--internal/praefect/transaction_test.go34
-rw-r--r--internal/praefect/transactions/manager.go30
-rw-r--r--internal/praefect/transactions/subtransaction.go16
-rw-r--r--internal/praefect/transactions/transaction.go39
5 files changed, 81 insertions, 49 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 22aa48702..df00fe4a8 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -231,16 +231,17 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
threshold += 1
}
- transactionID, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold)
+ transaction, transactionCleanup, err := c.txMgr.RegisterTransaction(ctx, voters, threshold)
if err != nil {
return nil, fmt.Errorf("registering transactions: %w", err)
}
finalizers = append(finalizers, func() error {
- successByNode, err := transactionCleanup()
- if err != nil {
+ if err := transactionCleanup(); err != nil {
return err
}
+ successByNode := transaction.State()
+
// If the primary node failed the transaction, then
// there's no sense in trying to replicate from primary
// to secondaries.
@@ -269,7 +270,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)()
})
- injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), shard.Primary.GetStorage(), true)
if err != nil {
return nil, err
}
@@ -282,7 +283,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, err
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, secondary.GetStorage(), false)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.GetStorage(), false)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 12223c272..9f927cbd1 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -78,17 +78,18 @@ func TestTransactionSucceeds(t *testing.T) {
client := gitalypb.NewRefTransactionClient(cc)
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
{Name: "node1", Votes: 1},
}, 1)
require.NoError(t, err)
- require.NotZero(t, transactionID)
+ require.NotNil(t, transaction)
+ require.NotZero(t, transaction.ID())
defer cancelTransaction()
hash := sha1.Sum([]byte{})
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: "node1",
ReferenceUpdatesHash: hash[:],
})
@@ -184,7 +185,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) {
threshold += 1
}
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold)
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, threshold)
require.NoError(t, err)
defer cancelTransaction()
@@ -196,7 +197,7 @@ func TestTransactionWithMultipleNodes(t *testing.T) {
defer wg.Done()
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: voters[idx].Name,
ReferenceUpdatesHash: tc.hashes[idx][:],
})
@@ -218,7 +219,7 @@ func TestTransactionWithContextCancellation(t *testing.T) {
ctx, cancel := testhelper.Context()
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
{Name: "voter", Votes: 1},
{Name: "absent", Votes: 1},
}, 2)
@@ -233,7 +234,7 @@ func TestTransactionWithContextCancellation(t *testing.T) {
go func() {
defer wg.Done()
_, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: "voter",
ReferenceUpdatesHash: hash[:],
})
@@ -407,7 +408,7 @@ func TestTransactionReachesQuorum(t *testing.T) {
})
}
- transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
defer cancel()
@@ -425,7 +426,7 @@ func TestTransactionReachesQuorum(t *testing.T) {
hash := sha1.Sum([]byte(v.vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
@@ -511,7 +512,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
})
}
- transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -525,7 +526,7 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
hash := sha1.Sum([]byte(vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
@@ -542,7 +543,8 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
wg.Wait()
- results, _ := cancel()
+ require.NoError(t, cancel())
+ results := transaction.State()
for i, voter := range tc.voters {
require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)])
}
@@ -639,7 +641,7 @@ func TestTransactionCancellation(t *testing.T) {
})
}
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -656,7 +658,7 @@ func TestTransactionCancellation(t *testing.T) {
hash := sha1.Sum([]byte(v.vote))
response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
+ TransactionId: transaction.ID(),
Node: name,
ReferenceUpdatesHash: hash[:],
})
@@ -671,9 +673,9 @@ func TestTransactionCancellation(t *testing.T) {
}
wg.Wait()
- results, err := cancelTransaction()
- require.NoError(t, err)
+ require.NoError(t, cancelTransaction())
+ results := transaction.State()
for i, v := range tc.voters {
require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state")
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 0084eee1f..1afc03588 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -24,7 +24,7 @@ var ErrNotFound = errors.New("transaction not found")
type Manager struct {
txIDGenerator TransactionIDGenerator
lock sync.Mutex
- transactions map[uint64]*transaction
+ transactions map[uint64]*Transaction
counterMetric *prometheus.CounterVec
delayMetric metrics.HistogramVec
subtransactionsMetric metrics.Histogram
@@ -92,7 +92,7 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt {
func NewManager(opts ...ManagerOpt) *Manager {
mgr := &Manager{
txIDGenerator: newTransactionIDGenerator(),
- transactions: make(map[uint64]*transaction),
+ transactions: make(map[uint64]*Transaction),
counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}),
delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}),
subtransactionsMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}),
@@ -111,15 +111,14 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger {
// CancelFunc is the transaction cancellation function returned by
// `RegisterTransaction`. Calling it will cause the transaction to be removed
-// from the transaction manager. It returns the outcome for each node: `true`
-// if the node committed the transaction, `false` if it aborted.
-type CancelFunc func() (map[string]bool, error)
+// from the transaction manager.
+type CancelFunc func() error
// RegisterTransaction registers a new reference transaction for a set of nodes
// taking part in the transaction. `threshold` is the threshold at which an
// election will succeed. It needs to be in the range `weight(voters)/2 <
// threshold <= weight(voters) to avoid indecidable votes.
-func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (uint64, CancelFunc, error) {
+func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
@@ -129,13 +128,13 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
// nodes still have in-flight transactions.
transactionID := mgr.txIDGenerator.ID()
- transaction, err := newTransaction(voters, threshold)
+ transaction, err := newTransaction(transactionID, voters, threshold)
if err != nil {
- return 0, nil, err
+ return nil, nil, err
}
if _, ok := mgr.transactions[transactionID]; ok {
- return 0, nil, errors.New("transaction exists already")
+ return nil, nil, errors.New("transaction exists already")
}
mgr.transactions[transactionID] = transaction
@@ -146,20 +145,21 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters)))
- return transactionID, func() (map[string]bool, error) {
- return mgr.cancelTransaction(transactionID, transaction)
+ return transaction, func() error {
+ return mgr.cancelTransaction(transaction)
}, nil
}
-func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) {
+func (mgr *Manager) cancelTransaction(transaction *Transaction) error {
mgr.lock.Lock()
defer mgr.lock.Unlock()
- delete(mgr.transactions, transactionID)
+ delete(mgr.transactions, transaction.ID())
- mgr.subtransactionsMetric.Observe(float64(transaction.countSubtransactions()))
+ transaction.cancel()
+ mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions()))
- return transaction.cancel(), nil
+ return nil
}
func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 9950ea53a..58bca0c92 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -74,22 +74,30 @@ func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error)
}, nil
}
-func (t *subtransaction) cancel() map[string]bool {
+func (t *subtransaction) cancel() {
t.lock.Lock()
defer t.lock.Unlock()
- results := make(map[string]bool, len(t.votersByNode))
- for node, voter := range t.votersByNode {
+ for _, voter := range t.votersByNode {
// If a voter didn't yet show up or is still undecided, we need
// to mark it as failed so it won't get the idea of committing
// the transaction at a later point anymore.
if voter.result == voteUndecided {
voter.result = voteAborted
}
- results[node] = voter.result == voteCommitted
}
close(t.cancelCh)
+}
+
+func (t *subtransaction) state() map[string]bool {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ results := make(map[string]bool, len(t.votersByNode))
+ for node, voter := range t.votersByNode {
+ results[node] = voter.result == voteCommitted
+ }
return results
}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index feb215c2c..388625a83 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -34,11 +34,12 @@ type Voter struct {
result voteResult
}
-// transaction is a session where a set of voters votes on one or more
+// Transaction is a session where a set of voters votes on one or more
// subtransactions. Subtransactions are a sequence of sessions, where each node
// needs to go through the same sequence and agree on the same thing in the end
// in order to have the complete transaction succeed.
-type transaction struct {
+type Transaction struct {
+ id uint64
threshold uint
voters []Voter
@@ -46,7 +47,7 @@ type transaction struct {
subtransactions []*subtransaction
}
-func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
+func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, error) {
if len(voters) == 0 {
return nil, ErrMissingNodes
}
@@ -74,13 +75,31 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
return nil, ErrInvalidThreshold
}
- return &transaction{
+ return &Transaction{
+ id: id,
threshold: threshold,
voters: voters,
}, nil
}
-func (t *transaction) cancel() map[string]bool {
+func (t *Transaction) cancel() {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ for _, subtransaction := range t.subtransactions {
+ subtransaction.cancel()
+ }
+}
+
+// ID returns the identifier used to uniquely identify a transaction.
+func (t *Transaction) ID() uint64 {
+ return t.id
+}
+
+// State returns the voting state mapped by voters. A voting state of `true`
+// means all subtransactions were successful, a voting state of `false` means
+// either no subtransactions were created or any of the subtransactions failed.
+func (t *Transaction) State() map[string]bool {
t.lock.Lock()
defer t.lock.Unlock()
@@ -91,7 +110,7 @@ func (t *transaction) cancel() map[string]bool {
// node as well. Otherwise, if all subtransactions for the node
// succeeded, the transaction did as well.
for _, subtransaction := range t.subtransactions {
- for voter, result := range subtransaction.cancel() {
+ for voter, result := range subtransaction.state() {
// If there already is an entry indicating failure, keep it.
if didSucceed, ok := results[voter]; ok && !didSucceed {
continue
@@ -103,7 +122,9 @@ func (t *transaction) cancel() map[string]bool {
return results
}
-func (t *transaction) countSubtransactions() int {
+// CountSubtransactions counts the number of subtransactions created as part of
+// the transaction.
+func (t *Transaction) CountSubtransactions() int {
return len(t.subtransactions)
}
@@ -111,7 +132,7 @@ func (t *transaction) countSubtransactions() int {
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the
// subtransactions, an error will be returned.
-func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
+func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
t.lock.Lock()
defer t.lock.Unlock()
@@ -149,7 +170,7 @@ func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
return subtransaction, nil
}
-func (t *transaction) vote(ctx context.Context, node string, hash []byte) error {
+func (t *Transaction) vote(ctx context.Context, node string, hash []byte) error {
subtransaction, err := t.getOrCreateSubtransaction(node)
if err != nil {
return err