diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-06 12:49:15 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-14 08:59:12 +0300 |
commit | afba66a3bad4e43f6981c096ddc0823cda8513f7 (patch) | |
tree | dbc6901f46ac15a29e861faf6b85f5f0ed467262 | |
parent | 25940ded652319121a22c9d31541bd82287c6d32 (diff) |
transactions: Expose info about voting outcomes
With the previous commit, we started to keep track about the outcome of
a transaction for each voter. Right now, this information is only
availbale internally and is thus not yet of much use. So let's change
the transaction manager's `CancelFunc` signature to return not only an
error, but also the map of voting outcomes by node.
-rw-r--r-- | internal/praefect/coordinator.go | 5 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 138 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 14 |
3 files changed, 117 insertions, 40 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 875f26f21..0ce1b36a9 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -235,7 +235,10 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall if err != nil { return nil, fmt.Errorf("registering transactions: %w", err) } - finalizers = append(finalizers, transactionCleanup) + finalizers = append(finalizers, func() error { + _, err := transactionCleanup() + return err + }) injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true) if err != nil { diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 3d3b15b0d..ed9f8ed77 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -19,6 +19,13 @@ import ( "google.golang.org/grpc/status" ) +type voter struct { + votes uint + vote string + showsUp bool + shouldSucceed bool +} + func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) txMgr := transactions.NewManager(opts...) @@ -310,13 +317,6 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) { } func TestTransactionReachesQuorum(t *testing.T) { - type voter struct { - votes uint - vote string - showsUp bool - shouldSucceed bool - } - tc := []struct { desc string voters []voter @@ -469,35 +469,109 @@ func TestTransactionFailures(t *testing.T) { } func TestTransactionCancellation(t *testing.T) { - counter, opts := setupMetrics() - cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) - defer cleanup() + testcases := []struct { + desc string + voters []voter + threshold uint + expectedMetrics counterMetrics + }{ + { + desc: "single node cancellation", + voters: []voter{ + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 1, + expectedMetrics: counterMetrics{registered: 1, committed: 0}, + }, + { + desc: "two nodes failing to show up", + voters: []voter{ + {votes: 1, showsUp: false, shouldSucceed: false}, + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 2, + expectedMetrics: counterMetrics{registered: 1, committed: 0}, + }, + { + desc: "two nodes with unweighted node failing", + voters: []voter{ + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 0, showsUp: false, shouldSucceed: false}, + }, + threshold: 1, + expectedMetrics: counterMetrics{registered: 1, started: 1, committed: 1}, + }, + { + desc: "multiple weighted votes with subset failing", + voters: []voter{ + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 2, + expectedMetrics: counterMetrics{registered: 1, started: 2, committed: 2}, + }, + } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + counter, opts := setupMetrics() + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) + defer cleanup() - client := gitalypb.NewRefTransactionClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ - {Name: "node1", Votes: 1}, - }, 1) - require.NoError(t, err) - require.NotZero(t, transactionID) + client := gitalypb.NewRefTransactionClient(cc) - cancelTransaction() + voters := make([]transactions.Voter, 0, len(tc.voters)) + for i, voter := range tc.voters { + voters = append(voters, transactions.Voter{ + Name: fmt.Sprintf("node-%d", i), + Votes: voter.votes, + }) + } - hash := sha1.Sum([]byte{}) - _, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, - Node: "node1", - ReferenceUpdatesHash: hash[:], - }) - require.Error(t, err) - require.Equal(t, codes.NotFound, status.Code(err)) + transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + require.NoError(t, err) - verifyCounterMetrics(t, counter, counterMetrics{ - registered: 1, - started: 1, - invalid: 1, - }) + var wg sync.WaitGroup + for i, v := range tc.voters { + if !v.showsUp { + continue + } + + wg.Add(1) + go func(i int, v voter) { + defer wg.Done() + + name := fmt.Sprintf("node-%d", i) + hash := sha1.Sum([]byte(v.vote)) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: name, + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + + if v.shouldSucceed { + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT") + } else { + require.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT") + } + }(i, v) + } + wg.Wait() + + results, err := cancelTransaction() + require.NoError(t, err) + + for i, v := range tc.voters { + require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state") + } + + verifyCounterMetrics(t, counter, tc.expectedMetrics) + }) + } } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 9f7b5f8ae..d0319d5c6 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -102,8 +102,9 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // CancelFunc is the transaction cancellation function returned by // `RegisterTransaction`. Calling it will cause the transaction to be removed -// from the transaction manager. -type CancelFunc func() error +// from the transaction manager. It returns the outcome for each node: `true` +// if the node committed the transaction, `false` if it aborted. +type CancelFunc func() (map[string]bool, error) // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an @@ -136,17 +137,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr mgr.counterMetric.WithLabelValues("registered").Inc() - return transactionID, func() error { - mgr.cancelTransaction(transactionID, transaction) - return nil + return transactionID, func() (map[string]bool, error) { + return mgr.cancelTransaction(transactionID, transaction) }, nil } -func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) { +func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) { mgr.lock.Lock() defer mgr.lock.Unlock() delete(mgr.transactions, transactionID) - transaction.cancel() + return transaction.cancel(), nil } func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { |