Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-09-23 15:17:49 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-12 12:02:54 +0300
commit810e6a261dde1e19e945042d6679e4b3a3d3cfc2 (patch)
tree55682c2a7df29c94352ae6a987c7cfc1a29c22ec /internal/praefect/transaction_test.go
parent0d2ea9ff5c3651dd152a258ecae5d9238457cc6a (diff)
transactions: Expose RPC to gracefully stop transactions
This commit adds the ability to stop transactions via the RefTransaction service. In contrast to state cancellation, this is considered to be a graceful stop of transactions: nodes which cast votes on stopped transactions are not supposed to treat it as an error if the transaction has been stopped, but should instead just terminate it and stop proceeding. To tell apart "real" errors from a stopped transaction, this commit thus adds a new "STOP" state which tells the client to stop processing it.
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r--internal/praefect/transaction_test.go213
1 files changed, 212 insertions, 1 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 71f5e3bc0..a1a1920b5 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma
}
type counterMetrics struct {
- registered, started, invalid, committed int
+ registered, started, invalid, committed, stopped int
}
func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) {
@@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected
{"started", expected.started},
{"invalid", expected.invalid},
{"committed", expected.committed},
+ {"stopped", expected.stopped},
}
var expectedMetric bytes.Buffer
@@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) {
})
}
}
+
+func TestStopTransaction(t *testing.T) {
+ hash := sha1.Sum([]byte("foo"))
+
+ t.Run("stopping nonexisting transaction fails", func(t *testing.T) {
+ cc, _, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: 1234,
+ })
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping transaction multiple times succeeds", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ for i := 0; i < 5; i++ {
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+ }
+
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 2,
+ stopped: 5,
+ })
+ })
+
+ t.Run("stopping a single voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.Equal(t, results["voter"], false)
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 1,
+ started: 1,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping in-progress transaction", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "successful-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State)
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "failing-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.True(t, results["successful-voter"], "Successful voter should succeed")
+ require.False(t, results["failing-voter"], "Failing voter should fail")
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ committed: 1,
+ registered: 2,
+ started: 2,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping cancelled transaction fails", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+
+ cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.Error(t, err)
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping concurrent voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ // This create a single voter waiting for the threshold to be
+ // reached. As the second voter will never appear, the node
+ // will instead be stopped by the call to `StopTransaction`
+ // below.
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ hash := sha1.Sum([]byte("hash"))
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "1",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+ }()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ wg.Wait()
+ })
+}