diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-20 08:56:48 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-20 11:26:00 +0300 |
commit | 154913666b44f186d66d97c967a1a3f06b794ea6 (patch) | |
tree | b023b8cf93f9e513d89615d88b0e530051a7a245 /internal/praefect/transaction_test.go | |
parent | 69df3cb43559ba55acde01b8ebeb253946b9d11f (diff) |
transactions: Implement multi-node transactions
Currently, transactions can only handle the trivial case of one single
node voting. This was mostly done to keep initial complexity low, but in
the end it is rather useless to vote when there's only one voter. This
restriction is now being lifted.
The implementation is quite simple: instead of storing the expected
voter's name, only, we store a map of votes which is initialized to the
set of expected voters. Upon receiving a vote, we will check that the
voter is expected and has not cast a vote yet. If the check succeeds, we
register his vote by storing his expected hash in the votes map.
Concurrency is handled via two channels, `doneCh` and `cancelCh`. The
latter may be executed via the cancel callback and aborts all
transactions currently in the `collectVotes()` phase. `doneCh` will be
closed as soon as the last voter has cast its vote and thus allow
`collectVotes()` to finish. Note that it's each voter's own
responsibility to count votes, which is mostly done in order to keep
complexity of the code low. As
- only the last voter may close a channel
- we know each voter up front
- no voter may cast votes more than once
we know that only a single node may ever close the `doneCh`.
This commit also adds a set of tests to verify code is working as
expected. One of the pre-existing tests that verified that we can only
ever register a single node, only, which was done as a precaution so
that we actually verify this and not run into any unsupported voting
setups. As we now support multiple nodes, this test is removed.
Diffstat (limited to 'internal/praefect/transaction_test.go')
-rw-r--r-- | internal/praefect/transaction_test.go | 152 |
1 files changed, 149 insertions, 3 deletions
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 95e5b3a41..c7e23555b 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -3,6 +3,7 @@ package praefect import ( "context" "crypto/sha1" + "sync" "testing" "time" @@ -90,14 +91,159 @@ func TestTransactionSucceeds(t *testing.T) { }) } -func TestTransactionFailsWithMultipleNodes(t *testing.T) { - _, txMgr, cleanup := runPraefectServerAndTxMgr(t) +func TestTransactionWithMultipleNodes(t *testing.T) { + testcases := []struct { + desc string + nodes []string + hashes [][20]byte + expectedState gitalypb.VoteTransactionResponse_TransactionState + }{ + { + desc: "Nodes with same hash", + nodes: []string{ + "node1", + "node2", + }, + hashes: [][20]byte{ + sha1.Sum([]byte{}), + sha1.Sum([]byte{}), + }, + expectedState: gitalypb.VoteTransactionResponse_COMMIT, + }, + { + desc: "Nodes with different hashes", + nodes: []string{ + "node1", + "node2", + }, + hashes: [][20]byte{ + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("bar")), + }, + expectedState: gitalypb.VoteTransactionResponse_ABORT, + }, + { + desc: "More nodes with same hash", + nodes: []string{ + "node1", + "node2", + "node3", + "node4", + }, + hashes: [][20]byte{ + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("foo")), + }, + expectedState: gitalypb.VoteTransactionResponse_COMMIT, + }, + { + desc: "Majority with same hash", + nodes: []string{ + "node1", + "node2", + "node3", + "node4", + }, + hashes: [][20]byte{ + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("foo")), + sha1.Sum([]byte("bar")), + sha1.Sum([]byte("foo")), + }, + expectedState: gitalypb.VoteTransactionResponse_ABORT, + }, + } + + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cleanup := testhelper.Context() + defer cleanup() + + client := gitalypb.NewRefTransactionClient(cc) + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, tc.nodes) + require.NoError(t, err) + defer cancelTransaction() + + var wg sync.WaitGroup + for i := 0; i < len(tc.nodes); i++ { + wg.Add(1) + + go func(idx int) { + defer wg.Done() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: tc.nodes[idx], + ReferenceUpdatesHash: tc.hashes[idx][:], + }) + require.NoError(t, err) + require.Equal(t, tc.expectedState, response.State) + }(i) + } + + wg.Wait() + }) + } +} + +func TestTransactionWithContextCancellation(t *testing.T) { + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + client := gitalypb.NewRefTransactionClient(cc) + + ctx, cancel := testhelper.Context() + + transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []string{"voter", "absent"}) + require.NoError(t, err) + defer cancelTransaction() + + hash := sha1.Sum([]byte{}) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + _, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: "voter", + ReferenceUpdatesHash: hash[:], + }) + require.Error(t, err) + require.Equal(t, codes.Canceled, status.Code(err)) + }() + + cancel() + wg.Wait() +} + +func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) { + ctx, cleanup := testhelper.Context() defer cleanup() + txMgr := transactions.NewManager() + + _, _, err := txMgr.RegisterTransaction(ctx, []string{}) + require.Equal(t, transactions.ErrMissingNodes, err) + + _, _, err = txMgr.RegisterTransaction(ctx, []string{"node1", "node2", "node1"}) + require.Equal(t, transactions.ErrDuplicateNodes, err) +} + +func TestTransactionRegistrationWithSameNodeFails(t *testing.T) { ctx, cleanup := testhelper.Context() defer cleanup() - _, _, err := txMgr.RegisterTransaction(ctx, []string{"node1", "node2"}) + txMgr := transactions.NewManager() + + _, _, err := txMgr.RegisterTransaction(ctx, []string{"foo", "bar", "foo"}) require.Error(t, err) } |