diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-09-23 15:17:49 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-12 12:02:54 +0300 |
commit | 810e6a261dde1e19e945042d6679e4b3a3d3cfc2 (patch) | |
tree | 55682c2a7df29c94352ae6a987c7cfc1a29c22ec /internal/praefect/transaction_test.go | |
parent | 0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (diff) |
transactions: Expose RPC to gracefully stop transactions
This commit adds the ability to stop transactions via the RefTransaction
service. In contrast to state cancellation, this is considered to be a
graceful stop of transactions: nodes which cast votes on stopped
transactions are not supposed to treat it as an error if the transaction
has been stopped, but should instead just terminate it and stop
proceeding. To tell apart "real" errors from a stopped transaction, this
commit thus adds a new "STOP" state which tells the client to stop
processing it.
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r-- | internal/praefect/transaction_test.go | 213 |
1 files changed, 212 insertions, 1 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 71f5e3bc0..a1a1920b5 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma } type counterMetrics struct { - registered, started, invalid, committed int + registered, started, invalid, committed, stopped int } func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) { @@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected {"started", expected.started}, {"invalid", expected.invalid}, {"committed", expected.committed}, + {"stopped", expected.stopped}, } var expectedMetric bytes.Buffer @@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) { }) } } + +func TestStopTransaction(t *testing.T) { + hash := sha1.Sum([]byte("foo")) + + t.Run("stopping nonexisting transaction fails", func(t *testing.T) { + cc, _, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: 1234, + }) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping transaction multiple times succeeds", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + for i := 0; i < 5; i++ { + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + } + + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 2, + stopped: 5, + }) + }) + + t.Run("stopping a single voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1) + require.NoError(t, err) + defer cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.Equal(t, results["voter"], false) + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 1, + started: 1, + stopped: 2, + }) + }) + + t.Run("stopping in-progress transaction", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "successful-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State) + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "failing-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.True(t, results["successful-voter"], "Successful voter should succeed") + require.False(t, results["failing-voter"], "Failing voter should fail") + verifyCounterMetrics(t, txMgr, counterMetrics{ + committed: 1, + registered: 2, + started: 2, + stopped: 2, + }) + }) + + t.Run("stopping cancelled transaction fails", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + + cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.Error(t, err) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping concurrent voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + // This create a single voter waiting for the threshold to be + // reached. As the second voter will never appear, the node + // will instead be stopped by the call to `StopTransaction` + // below. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + hash := sha1.Sum([]byte("hash")) + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "1", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + }() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + wg.Wait() + }) +} |