diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-17 13:56:56 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-17 13:56:56 +0300 |
commit | 9e6f5f40e6eb44655b6acfd5dc222af04333a4f2 (patch) | |
tree | 7f0d8eb2f48cbcc6815e994df5b4bbe39112e123 | |
parent | d13f378d0015ad0888e2558e7141fbffe1b46b85 (diff) | |
parent | 9c3d907fa8f50a8ed6a87279631cc4c729b91d3f (diff) |
Merge branch 'pks-subtransactions' into 'master'
Allow multiple votes per transaction
Closes #2939
See merge request gitlab-org/gitaly!2386
-rw-r--r-- | changelogs/unreleased/pks-subtransactions.yml | 5 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 107 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 6 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 198 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 209 |
5 files changed, 384 insertions, 141 deletions
diff --git a/changelogs/unreleased/pks-subtransactions.yml b/changelogs/unreleased/pks-subtransactions.yml new file mode 100644 index 000000000..fca91046d --- /dev/null +++ b/changelogs/unreleased/pks-subtransactions.yml @@ -0,0 +1,5 @@ +--- +title: Allow multiple votes per transaction +merge_request: 2386 +author: +type: changed diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 55882f588..12223c272 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -443,6 +444,112 @@ func TestTransactionReachesQuorum(t *testing.T) { } } +func TestTransactionWithMultipleVotes(t *testing.T) { + type multiVoter struct { + voteCount uint + votes []string + voteSucceeds []bool + shouldSucceed bool + } + + tc := []struct { + desc string + voters []multiVoter + threshold uint + }{ + { + desc: "quorum is reached with multiple votes", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, + }, + threshold: 2, + }, + { + desc: "quorum is not reached with disagreeing votes", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 2, + }, + { + desc: "quorum is reached with unweighted disagreeing voter", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 1, + }, + { + desc: "quorum is reached with outweighed disagreeing voter", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 2, + }, + } + + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cleanup := testhelper.Context() + defer cleanup() + + client := gitalypb.NewRefTransactionClient(cc) + + for _, tc := range tc { + t.Run(tc.desc, func(t *testing.T) { + var voters []transactions.Voter + + for i, voter := range tc.voters { + voters = append(voters, transactions.Voter{ + Name: fmt.Sprintf("node-%d", i), + Votes: voter.voteCount, + }) + } + + transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + require.NoError(t, err) + + var wg sync.WaitGroup + for i, v := range tc.voters { + wg.Add(1) + go func(i int, v multiVoter) { + defer wg.Done() + + for j, vote := range v.votes { + name := fmt.Sprintf("node-%d", i) + hash := sha1.Sum([]byte(vote)) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: name, + ReferenceUpdatesHash: hash[:], + }) + assert.NoError(t, err) + + if v.voteSucceeds[j] { + assert.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT") + } else { + assert.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT") + } + } + }(i, v) + } + + wg.Wait() + + results, _ := cancel() + for i, voter := range tc.voters { + require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)]) + } + }) + } +} + func TestTransactionFailures(t *testing.T) { counter, opts := setupMetrics() cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...) diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 0cb967209..252146dda 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -158,11 +158,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n return ErrNotFound } - if err := transaction.vote(node, hash); err != nil { - return err - } - - if err := transaction.collectVotes(ctx, node); err != nil { + if err := transaction.vote(ctx, node, hash); err != nil { return err } diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go new file mode 100644 index 000000000..9950ea53a --- /dev/null +++ b/internal/praefect/transactions/subtransaction.go @@ -0,0 +1,198 @@ +package transactions + +import ( + "context" + "crypto/sha1" + "errors" + "fmt" + "sync" +) + +var ( + // ErrTransactionVoteFailed indicates the transaction didn't reach quorum. + ErrTransactionVoteFailed = errors.New("transaction did not reach quorum") + // ErrTransactionCanceled indicates the transaction was canceled before + // reaching quorum. + ErrTransactionCanceled = errors.New("transaction was canceled") +) + +// voteResult represents the outcome of a transaction for a single voter. +type voteResult int + +const ( + // voteUndecided means that the voter either didn't yet show up or that + // the vote couldn't yet be decided due to there being no majority yet. + voteUndecided voteResult = iota + // voteCommitted means that the voter committed his vote. + voteCommitted + // voteAborted means that the voter aborted his vote. + voteAborted +) + +type vote [sha1.Size]byte + +func voteFromHash(hash []byte) (vote, error) { + var vote vote + + if len(hash) != sha1.Size { + return vote, fmt.Errorf("invalid voting hash: %q", hash) + } + + copy(vote[:], hash) + return vote, nil +} + +func (v vote) isEmpty() bool { + return v == vote{} +} + +// subtransaction is a single session where voters are voting for a certain outcome. +type subtransaction struct { + doneCh chan interface{} + cancelCh chan interface{} + + threshold uint + + lock sync.RWMutex + votersByNode map[string]*Voter + voteCounts map[vote]uint +} + +func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error) { + votersByNode := make(map[string]*Voter, len(voters)) + for _, voter := range voters { + voter := voter // rescope loop variable + votersByNode[voter.Name] = &voter + } + + return &subtransaction{ + doneCh: make(chan interface{}), + cancelCh: make(chan interface{}), + threshold: threshold, + votersByNode: votersByNode, + voteCounts: make(map[vote]uint, len(voters)), + }, nil +} + +func (t *subtransaction) cancel() map[string]bool { + t.lock.Lock() + defer t.lock.Unlock() + + results := make(map[string]bool, len(t.votersByNode)) + for node, voter := range t.votersByNode { + // If a voter didn't yet show up or is still undecided, we need + // to mark it as failed so it won't get the idea of committing + // the transaction at a later point anymore. + if voter.result == voteUndecided { + voter.result = voteAborted + } + results[node] = voter.result == voteCommitted + } + + close(t.cancelCh) + + return results +} + +func (t *subtransaction) vote(node string, hash []byte) error { + vote, err := voteFromHash(hash) + if err != nil { + return err + } + + t.lock.Lock() + defer t.lock.Unlock() + + // Cast our vote. In case the node doesn't exist or has already cast a + // vote, we need to abort. + voter, ok := t.votersByNode[node] + if !ok { + return fmt.Errorf("invalid node for transaction: %q", node) + } + if !voter.vote.isEmpty() { + return fmt.Errorf("node already cast a vote: %q", node) + } + voter.vote = vote + + oldCount := t.voteCounts[vote] + newCount := oldCount + voter.Votes + t.voteCounts[vote] = newCount + + // If the threshold was reached before already, we mustn't try to + // signal the other voters again. + if oldCount >= t.threshold { + return nil + } + + // If we've just crossed the threshold, signal all voters that the + // voting has concluded. + if newCount >= t.threshold { + close(t.doneCh) + return nil + } + + // If any other vote has already reached the threshold, we mustn't try + // to notify voters again. + for _, count := range t.voteCounts { + if count >= t.threshold { + return nil + } + } + + // If any of the voters didn't yet cast its vote, we need to wait for + // them. + for _, voter := range t.votersByNode { + if voter.vote.isEmpty() { + return nil + } + } + + // Otherwise, signal voters that all votes were gathered. + close(t.doneCh) + return nil +} + +func (t *subtransaction) collectVotes(ctx context.Context, node string) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.cancelCh: + return ErrTransactionCanceled + case <-t.doneCh: + break + } + + t.lock.RLock() + defer t.lock.RUnlock() + + voter, ok := t.votersByNode[node] + if !ok { + return fmt.Errorf("invalid node for transaction: %q", node) + } + + if voter.result != voteUndecided { + return fmt.Errorf("voter has already settled on an outcome: %q", node) + } + + // See if our vote crossed the threshold. As there can be only one vote + // exceeding it, we know we're the winner in that case. + if t.voteCounts[voter.vote] < t.threshold { + voter.result = voteAborted + return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold) + } + + voter.result = voteCommitted + return nil +} + +func (t *subtransaction) getResult(node string) (voteResult, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + voter, ok := t.votersByNode[node] + if !ok { + return voteAborted, fmt.Errorf("invalid node for transaction: %q", node) + } + + return voter.result, nil +} diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index ceb6b9a29..cfcca1658 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -2,31 +2,23 @@ package transactions import ( "context" - "crypto/sha1" "errors" - "fmt" "sync" ) var ( - ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes") - ErrMissingNodes = errors.New("transaction requires at least one node") - ErrInvalidThreshold = errors.New("transaction has invalid threshold") - ErrTransactionVoteFailed = errors.New("transaction did not reach quorum") - ErrTransactionCanceled = errors.New("transaction was canceled") -) - -// voteResult represents the outcome of a transaction for a single voter. -type voteResult int - -const ( - // voteUndecided means that the voter either didn't yet show up or that - // the vote couldn't yet be decided due to there being no majority yet. - voteUndecided voteResult = iota - // voteCommitted means that the voter committed his vote. - voteCommitted - // voteAborted means that the voter aborted his vote. - voteAborted + // ErrDuplicateNodes indicates a transaction was registered with two + // voters having the same name. + ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes") + // ErrMissingNodes indicates a transaction was registered with no voters. + ErrMissingNodes = errors.New("transaction requires at least one node") + // ErrInvalidThreshold indicates a transaction was registered with an + // invalid threshold that may either allow for multiple different + // quorums or none at all. + ErrInvalidThreshold = errors.New("transaction has invalid threshold") + // ErrSubtransactionFailed indicates a vote was cast on a + // subtransaction which failed already. + ErrSubtransactionFailed = errors.New("subtransaction has failed") ) // Voter is a participant in a given transaction that may cast a vote. @@ -42,32 +34,16 @@ type Voter struct { result voteResult } -type vote [sha1.Size]byte - -func voteFromHash(hash []byte) (vote, error) { - var vote vote - - if len(hash) != sha1.Size { - return vote, fmt.Errorf("invalid voting hash: %q", hash) - } - - copy(vote[:], hash) - return vote, nil -} - -func (v vote) isEmpty() bool { - return v == vote{} -} - +// transaction is a session where a set of voters votes on one or more +// subtransactions. Subtransactions are a sequence of sessions, where each node +// needs to go through the same sequence and agree on the same thing in the end +// in order to have the complete transaction succeed. type transaction struct { - doneCh chan interface{} - cancelCh chan interface{} - threshold uint + voters []Voter - lock sync.RWMutex - votersByNode map[string]*Voter - voteCounts map[vote]uint + lock sync.Mutex + subtransactions []*subtransaction } func newTransaction(voters []Voter, threshold uint) (*transaction, error) { @@ -76,15 +52,12 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) { } var totalVotes uint - votersByNode := make(map[string]*Voter, len(voters)) - + votersByNode := make(map[string]interface{}, len(voters)) for _, voter := range voters { if _, ok := votersByNode[voter.Name]; ok { return nil, ErrDuplicateNodes } - - voter := voter // rescope loop variable - votersByNode[voter.Name] = &voter + votersByNode[voter.Name] = nil totalVotes += voter.Votes } @@ -102,11 +75,8 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) { } return &transaction{ - doneCh: make(chan interface{}), - cancelCh: make(chan interface{}), - threshold: threshold, - votersByNode: votersByNode, - voteCounts: make(map[vote]uint, len(votersByNode)), + threshold: threshold, + voters: voters, }, nil } @@ -114,109 +84,76 @@ func (t *transaction) cancel() map[string]bool { t.lock.Lock() defer t.lock.Unlock() - results := make(map[string]bool, len(t.votersByNode)) - for node, voter := range t.votersByNode { - // If a voter didn't yet show up or is still undecided, we need - // to mark it as failed so it won't get the idea of committing - // the transaction at a later point anymore. - if voter.result == voteUndecided { - voter.result = voteAborted + results := make(map[string]bool, len(t.voters)) + + // We need to collect outcomes of all subtransactions. If any of the + // subtransactions failed, then the overall transaction failed for that + // node as well. Otherwise, if all subtransactions for the node + // succeeded, the transaction did as well. + for _, subtransaction := range t.subtransactions { + for voter, result := range subtransaction.cancel() { + // If there already is an entry indicating failure, keep it. + if didSucceed, ok := results[voter]; ok && !didSucceed { + continue + } + results[voter] = result } - results[node] = voter.result == voteCommitted } - close(t.cancelCh) - return results } -func (t *transaction) vote(node string, hash []byte) error { - vote, err := voteFromHash(hash) - if err != nil { - return err - } - +// getOrCreateSubtransaction gets an ongoing subtransaction on which the given +// node hasn't yet voted on or creates a new one if the node has succeeded on +// all subtransactions. In case the node has failed on any of the +// subtransactions, an error will be returned. +func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { t.lock.Lock() defer t.lock.Unlock() - // Cast our vote. In case the node doesn't exist or has already cast a - // vote, we need to abort. - voter, ok := t.votersByNode[node] - if !ok { - return fmt.Errorf("invalid node for transaction: %q", node) - } - if !voter.vote.isEmpty() { - return fmt.Errorf("node already cast a vote: %q", node) - } - voter.vote = vote - - oldCount := t.voteCounts[vote] - newCount := oldCount + voter.Votes - t.voteCounts[vote] = newCount - - // If the threshold was reached before already, we mustn't try to - // signal the other voters again. - if oldCount >= t.threshold { - return nil - } - - // If we've just crossed the threshold, signal all voters that the - // voting has concluded. - if newCount >= t.threshold { - close(t.doneCh) - return nil - } - - // If any other vote has already reached the threshold, we mustn't try - // to notify voters again. - for _, count := range t.voteCounts { - if count >= t.threshold { - return nil + for _, subtransaction := range t.subtransactions { + result, err := subtransaction.getResult(node) + if err != nil { + return nil, err } - } - // If any of the voters didn't yet cast its vote, we need to wait for - // them. - for _, voter := range t.votersByNode { - if voter.vote.isEmpty() { - return nil + switch result { + case voteUndecided: + // An undecided vote means we should vote on this one. + return subtransaction, nil + case voteCommitted: + // If we have committed this subtransaction, we're good + // to go. + continue + case voteAborted: + // If the subtransaction was aborted, then we need to + // fail as we cannot proceed if the path leading to the + // end result has intermittent failures. + return nil, ErrSubtransactionFailed } } - // Otherwise, signal voters that all votes were gathered. - close(t.doneCh) - return nil -} - -func (t *transaction) collectVotes(ctx context.Context, node string) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-t.cancelCh: - return ErrTransactionCanceled - case <-t.doneCh: - break + // If we arrive here, then we know that all the node has voted and + // reached quorum on all subtransactions. We can thus create a new one. + subtransaction, err := newSubtransaction(t.voters, t.threshold) + if err != nil { + return nil, err } - t.lock.RLock() - defer t.lock.RUnlock() + t.subtransactions = append(t.subtransactions, subtransaction) - voter, ok := t.votersByNode[node] - if !ok { - return fmt.Errorf("invalid node for transaction: %q", node) - } + return subtransaction, nil +} - if voter.result != voteUndecided { - return fmt.Errorf("voter has already settled on an outcome: %q", node) +func (t *transaction) vote(ctx context.Context, node string, hash []byte) error { + subtransaction, err := t.getOrCreateSubtransaction(node) + if err != nil { + return err } - // See if our vote crossed the threshold. As there can be only one vote - // exceeding it, we know we're the winner in that case. - if t.voteCounts[voter.vote] < t.threshold { - voter.result = voteAborted - return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold) + if err := subtransaction.vote(node, hash); err != nil { + return err } - voter.result = voteCommitted - return nil + return subtransaction.collectVotes(ctx, node) } |