diff options
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r-- | internal/praefect/transaction_test.go | 213 |
1 files changed, 212 insertions, 1 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 71f5e3bc0..a1a1920b5 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -38,7 +38,7 @@ func runPraefectServerAndTxMgr(t testing.TB) (*grpc.ClientConn, *transactions.Ma } type counterMetrics struct { - registered, started, invalid, committed int + registered, started, invalid, committed, stopped int } func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected counterMetrics) { @@ -52,6 +52,7 @@ func verifyCounterMetrics(t *testing.T, manager *transactions.Manager, expected {"started", expected.started}, {"invalid", expected.invalid}, {"committed", expected.committed}, + {"stopped", expected.stopped}, } var expectedMetric bytes.Buffer @@ -680,3 +681,213 @@ func TestTransactionCancellation(t *testing.T) { }) } } + +func TestStopTransaction(t *testing.T) { + hash := sha1.Sum([]byte("foo")) + + t.Run("stopping nonexisting transaction fails", func(t *testing.T) { + cc, _, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: 1234, + }) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping transaction multiple times succeeds", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + for i := 0; i < 5; i++ { + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + } + + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 2, + stopped: 5, + }) + }) + + t.Run("stopping a single voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 1) + require.NoError(t, err) + defer cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.Equal(t, results["voter"], false) + verifyCounterMetrics(t, txMgr, counterMetrics{ + registered: 1, + started: 1, + stopped: 2, + }) + }) + + t.Run("stopping in-progress transaction", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "successful-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State) + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + response, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "failing-voter", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + + results := transaction.State() + require.True(t, results["successful-voter"], "Successful voter should succeed") + require.False(t, results["failing-voter"], "Failing voter should fail") + verifyCounterMetrics(t, txMgr, counterMetrics{ + committed: 1, + registered: 2, + started: 2, + stopped: 2, + }) + }) + + t.Run("stopping cancelled transaction fails", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "successful-voter", Votes: 2}, + {Name: "failing-voter", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + + cancelTransaction() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.Error(t, err) + require.Equal(t, codes.NotFound, status.Code(err)) + }) + + t.Run("stopping concurrent voter", func(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cancel := testhelper.Context() + defer cancel() + + client := gitalypb.NewRefTransactionClient(cc) + + voters := []transactions.Voter{ + {Name: "1", Votes: 1}, + {Name: "2", Votes: 1}, + } + + transaction, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, 2) + require.NoError(t, err) + defer cancelTransaction() + + // This create a single voter waiting for the threshold to be + // reached. As the second voter will never appear, the node + // will instead be stopped by the call to `StopTransaction` + // below. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + hash := sha1.Sum([]byte("hash")) + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transaction.ID(), + Node: "1", + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) + }() + + _, err = client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: transaction.ID(), + }) + require.NoError(t, err) + + wg.Wait() + }) +} |