Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r--internal/praefect/transaction_test.go213
1 files changed, 212 insertions, 1 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 71f5e3bc0..a1a1920b5 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma
}
type counterMetrics struct {
- registered, started, invalid, committed int
+ registered, started, invalid, committed, stopped int
}
func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) {
@@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected
{"started", expected.started},
{"invalid", expected.invalid},
{"committed", expected.committed},
+ {"stopped", expected.stopped},
}
var expectedMetric bytes.Buffer
@@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) {
})
}
}
+
+func TestStopTransaction(t *testing.T) {
+ hash := sha1.Sum([]byte("foo"))
+
+ t.Run("stopping nonexisting transaction fails", func(t *testing.T) {
+ cc, _, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: 1234,
+ })
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping transaction multiple times succeeds", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ for i := 0; i < 5; i++ {
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+ }
+
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 2,
+ stopped: 5,
+ })
+ })
+
+ t.Run("stopping a single voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.Equal(t, results["voter"], false)
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ registered: 1,
+ started: 1,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping in-progress transaction", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "successful-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State)
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "failing-voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+
+ results := transaction.State()
+ require.True(t, results["successful-voter"], "Successful voter should succeed")
+ require.False(t, results["failing-voter"], "Failing voter should fail")
+ verifyCounterMetrics(t, txMgr, counterMetrics{
+ committed: 1,
+ registered: 2,
+ started: 2,
+ stopped: 2,
+ })
+ })
+
+ t.Run("stopping cancelled transaction fails", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "successful-voter", Votes: 2},
+ {Name: "failing-voter", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+
+ cancelTransaction()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.Error(t, err)
+ require.Equal(t, codes.NotFound, status.Code(err))
+ })
+
+ t.Run("stopping concurrent voter", func(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ voters := []transactions.Voter{
+ {Name: "1", Votes: 1},
+ {Name: "2", Votes: 1},
+ }
+
+ transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ // This create a single voter waiting for the threshold to be
+ // reached. As the second voter will never appear, the node
+ // will instead be stopped by the call to `StopTransaction`
+ // below.
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ hash := sha1.Sum([]byte("hash"))
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transaction.ID(),
+ Node: "1",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
+ }()
+
+ _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: transaction.ID(),
+ })
+ require.NoError(t, err)
+
+ wg.Wait()
+ })
+}