Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 08:56:48 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 11:26:00 +0300
commit154913666b44f186d66d97c967a1a3f06b794ea6 (patch)
treeb023b8cf93f9e513d89615d88b0e530051a7a245 /internal/praefect/transaction_test.go
parent69df3cb43559ba55acde01b8ebeb253946b9d11f (diff)
transactions: Implement multi-node transactions
Currently, transactions can only handle the trivial case of one single node voting. This was mostly done to keep initial complexity low, but in the end it is rather useless to vote when there's only one voter. This restriction is now being lifted. The implementation is quite simple: instead of storing the expected voter's name, only, we store a map of votes which is initialized to the set of expected voters. Upon receiving a vote, we will check that the voter is expected and has not cast a vote yet. If the check succeeds, we register his vote by storing his expected hash in the votes map. Concurrency is handled via two channels, `doneCh` and `cancelCh`. The latter may be executed via the cancel callback and aborts all transactions currently in the `collectVotes()` phase. `doneCh` will be closed as soon as the last voter has cast its vote and thus allow `collectVotes()` to finish. Note that it's each voter's own responsibility to count votes, which is mostly done in order to keep complexity of the code low. As - only the last voter may close a channel - we know each voter up front - no voter may cast votes more than once we know that only a single node may ever close the `doneCh`. This commit also adds a set of tests to verify code is working as expected. One of the pre-existing tests that verified that we can only ever register a single node, only, which was done as a precaution so that we actually verify this and not run into any unsupported voting setups. As we now support multiple nodes, this test is removed.
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r--internal/praefect/transaction_test.go152
1 files changed, 149 insertions, 3 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 95e5b3a41..c7e23555b 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -3,6 +3,7 @@ package praefect
import (
"context"
"crypto/sha1"
+ "sync"
"testing"
"time"
@@ -90,14 +91,159 @@ func TestTransactionSucceeds(t *testing.T) {
})
}
-func TestTransactionFailsWithMultipleNodes(t *testing.T) {
- _, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+func TestTransactionWithMultipleNodes(t *testing.T) {
+ testcases := []struct {
+ desc string
+ nodes []string
+ hashes [][20]byte
+ expectedState gitalypb.VoteTransactionResponse_TransactionState
+ }{
+ {
+ desc: "Nodes with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte{}),
+ sha1.Sum([]byte{}),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_COMMIT,
+ },
+ {
+ desc: "Nodes with different hashes",
+ nodes: []string{
+ "node1",
+ "node2",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("bar")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_ABORT,
+ },
+ {
+ desc: "More nodes with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ "node3",
+ "node4",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_COMMIT,
+ },
+ {
+ desc: "Majority with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ "node3",
+ "node4",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("bar")),
+ sha1.Sum([]byte("foo")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_ABORT,
+ },
+ }
+
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, tc.nodes)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ var wg sync.WaitGroup
+ for i := 0; i < len(tc.nodes); i++ {
+ wg.Add(1)
+
+ go func(idx int) {
+ defer wg.Done()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: tc.nodes[idx],
+ ReferenceUpdatesHash: tc.hashes[idx][:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedState, response.State)
+ }(i)
+ }
+
+ wg.Wait()
+ })
+ }
+}
+
+func TestTransactionWithContextCancellation(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ ctx, cancel := testhelper.Context()
+
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []string{"voter", "absent"})
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ hash := sha1.Sum([]byte{})
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: "voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.Error(t, err)
+ require.Equal(t, codes.Canceled, status.Code(err))
+ }()
+
+ cancel()
+ wg.Wait()
+}
+
+func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) {
+ ctx, cleanup := testhelper.Context()
defer cleanup()
+ txMgr := transactions.NewManager()
+
+ _, _, err := txMgr.RegisterTransaction(ctx, []string{})
+ require.Equal(t, transactions.ErrMissingNodes, err)
+
+ _, _, err = txMgr.RegisterTransaction(ctx, []string{"node1", "node2", "node1"})
+ require.Equal(t, transactions.ErrDuplicateNodes, err)
+}
+
+func TestTransactionRegistrationWithSameNodeFails(t *testing.T) {
ctx, cleanup := testhelper.Context()
defer cleanup()
- _, _, err := txMgr.RegisterTransaction(ctx, []string{"node1", "node2"})
+ txMgr := transactions.NewManager()
+
+ _, _, err := txMgr.RegisterTransaction(ctx, []string{"foo", "bar", "foo"})
require.Error(t, err)
}