Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-06 12:49:15 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-14 08:59:12 +0300
commitafba66a3bad4e43f6981c096ddc0823cda8513f7 (patch)
treedbc6901f46ac15a29e861faf6b85f5f0ed467262
parent25940ded652319121a22c9d31541bd82287c6d32 (diff)
transactions: Expose info about voting outcomes
With the previous commit, we started to keep track about the outcome of a transaction for each voter. Right now, this information is only availbale internally and is thus not yet of much use. So let's change the transaction manager's `CancelFunc` signature to return not only an error, but also the map of voting outcomes by node.
-rw-r--r--internal/praefect/coordinator.go5
-rw-r--r--internal/praefect/transaction_test.go138
-rw-r--r--internal/praefect/transactions/manager.go14
3 files changed, 117 insertions, 40 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 875f26f21..0ce1b36a9 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -235,7 +235,10 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
if err != nil {
return nil, fmt.Errorf("registering transactions: %w", err)
}
- finalizers = append(finalizers, transactionCleanup)
+ finalizers = append(finalizers, func() error {
+ _, err := transactionCleanup()
+ return err
+ })
injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true)
if err != nil {
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 3d3b15b0d..ed9f8ed77 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -19,6 +19,13 @@ import (
"google.golang.org/grpc/status"
)
+type voter struct {
+ votes uint
+ vote string
+ showsUp bool
+ shouldSucceed bool
+}
+
func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
txMgr := transactions.NewManager(opts...)
@@ -310,13 +317,6 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) {
}
func TestTransactionReachesQuorum(t *testing.T) {
- type voter struct {
- votes uint
- vote string
- showsUp bool
- shouldSucceed bool
- }
-
tc := []struct {
desc string
voters []voter
@@ -469,35 +469,109 @@ func TestTransactionFailures(t *testing.T) {
}
func TestTransactionCancellation(t *testing.T) {
- counter, opts := setupMetrics()
- cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
- defer cleanup()
+ testcases := []struct {
+ desc string
+ voters []voter
+ threshold uint
+ expectedMetrics counterMetrics
+ }{
+ {
+ desc: "single node cancellation",
+ voters: []voter{
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 1,
+ expectedMetrics: counterMetrics{registered: 1, committed: 0},
+ },
+ {
+ desc: "two nodes failing to show up",
+ voters: []voter{
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 2,
+ expectedMetrics: counterMetrics{registered: 1, committed: 0},
+ },
+ {
+ desc: "two nodes with unweighted node failing",
+ voters: []voter{
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 0, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 1,
+ expectedMetrics: counterMetrics{registered: 1, started: 1, committed: 1},
+ },
+ {
+ desc: "multiple weighted votes with subset failing",
+ voters: []voter{
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 2,
+ expectedMetrics: counterMetrics{registered: 1, started: 2, committed: 2},
+ },
+ }
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ counter, opts := setupMetrics()
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
+ defer cleanup()
- client := gitalypb.NewRefTransactionClient(cc)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
- {Name: "node1", Votes: 1},
- }, 1)
- require.NoError(t, err)
- require.NotZero(t, transactionID)
+ client := gitalypb.NewRefTransactionClient(cc)
- cancelTransaction()
+ voters := make([]transactions.Voter, 0, len(tc.voters))
+ for i, voter := range tc.voters {
+ voters = append(voters, transactions.Voter{
+ Name: fmt.Sprintf("node-%d", i),
+ Votes: voter.votes,
+ })
+ }
- hash := sha1.Sum([]byte{})
- _, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
- Node: "node1",
- ReferenceUpdatesHash: hash[:],
- })
- require.Error(t, err)
- require.Equal(t, codes.NotFound, status.Code(err))
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ require.NoError(t, err)
- verifyCounterMetrics(t, counter, counterMetrics{
- registered: 1,
- started: 1,
- invalid: 1,
- })
+ var wg sync.WaitGroup
+ for i, v := range tc.voters {
+ if !v.showsUp {
+ continue
+ }
+
+ wg.Add(1)
+ go func(i int, v voter) {
+ defer wg.Done()
+
+ name := fmt.Sprintf("node-%d", i)
+ hash := sha1.Sum([]byte(v.vote))
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: name,
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+
+ if v.shouldSucceed {
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT")
+ } else {
+ require.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT")
+ }
+ }(i, v)
+ }
+ wg.Wait()
+
+ results, err := cancelTransaction()
+ require.NoError(t, err)
+
+ for i, v := range tc.voters {
+ require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state")
+ }
+
+ verifyCounterMetrics(t, counter, tc.expectedMetrics)
+ })
+ }
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 9f7b5f8ae..d0319d5c6 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -102,8 +102,9 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger {
// CancelFunc is the transaction cancellation function returned by
// `RegisterTransaction`. Calling it will cause the transaction to be removed
-// from the transaction manager.
-type CancelFunc func() error
+// from the transaction manager. It returns the outcome for each node: `true`
+// if the node committed the transaction, `false` if it aborted.
+type CancelFunc func() (map[string]bool, error)
// RegisterTransaction registers a new reference transaction for a set of nodes
// taking part in the transaction. `threshold` is the threshold at which an
@@ -136,17 +137,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
mgr.counterMetric.WithLabelValues("registered").Inc()
- return transactionID, func() error {
- mgr.cancelTransaction(transactionID, transaction)
- return nil
+ return transactionID, func() (map[string]bool, error) {
+ return mgr.cancelTransaction(transactionID, transaction)
}, nil
}
-func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) {
+func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
delete(mgr.transactions, transactionID)
- transaction.cancel()
+ return transaction.cancel(), nil
}
func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {