diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-15 15:21:24 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-07-17 13:22:51 +0300 |
commit | db4d06b54f7d319d1b7408ec2f7c38e154bc29d8 (patch) | |
tree | ecf4e0c1386f4321202047c452cac65440e95b67 | |
parent | 31a0f591e965d3b2d52fe139f7e7d30d7474f504 (diff) |
transaction: Allow multiple votes per transaction
Right now, we're using the pre-receive hook to call into transactions on
the Gitaly nodes. This hook is well-understood to be executed once and
only once per action, and as a result the current implementation of
transactions allowed for a single vote via this pre-receive hook, only.
The pre-receive hook was a stop-gap implementation of the real mechanism
we wanted to eventually use, though, which is the reference-transaction
hook that's going to be released as part of git-core v2.28. While the
input to both hooks is the same and thus no changes to the actual voting
logic should be required, the most important difference is that the
reference-transaction hook may be invoked arbitrarily many times for
each Git command. E.g. a non-atomic push will execute the hook as many
times as there are updated references.
This surfaces a current design limitation of the transaction mechanism
as it is implemented in Gitaly: as a transaction only allows for a
single vote per node, it's inherently incompatible with the semantics
introduce by the reference-transaction hook. This is why we now extend
the transaction mechanism to allow a sequence of votes instead by
introducing subtransactions.
Subtransactions encapsulate all the voting logic that was previously
part of the transaction, which is casting the vote and collecting the
votes to establish whether quorum was reached. Transactions now start to
be a wrapper of a set of transactions, which transparantly creates new
subtransactions as required whenever a node casts a vote. Whenever a
node casts a vote, one of three things may now happen:
1. A subtransaction exists where the node's status is "aborted". This
means that any one of the subtransactions failed and thus the
complete transaction needs to be labelled as "aborted" for the
node. It will receive an error and is not allowed to cast any
votes anymore.
2. A subtransaction exists where the node's state is "undecided". As
this means that the node simply didn't cast a vote, it will cast
its vote for the oldest undecided subtransaction and wait for
quorum to be reached.
3. All existing subtransactions are in "committed" state for the
given node. We'll thus create a new subtransaction, cast our vote
for this new transaction and wait for quorum to be reached.
To evaluate the complete transaction's outcome, we need to establish
whether for a given node, all created subtransactions are in state
"committed". Only if that's the case will the transaction be treated as
successful for the node.
With this logic, we're now able to transparently allow a voter to cast a
sequence of votes.
-rw-r--r-- | changelogs/unreleased/pks-subtransactions.yml | 5 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 107 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 12 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 99 |
4 files changed, 206 insertions, 17 deletions
diff --git a/changelogs/unreleased/pks-subtransactions.yml b/changelogs/unreleased/pks-subtransactions.yml new file mode 100644 index 000000000..fca91046d --- /dev/null +++ b/changelogs/unreleased/pks-subtransactions.yml @@ -0,0 +1,5 @@ +--- +title: Allow multiple votes per transaction +merge_request: 2386 +author: +type: changed diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 55882f588..12223c272 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -443,6 +444,112 @@ func TestTransactionReachesQuorum(t *testing.T) { } } +func TestTransactionWithMultipleVotes(t *testing.T) { + type multiVoter struct { + voteCount uint + votes []string + voteSucceeds []bool + shouldSucceed bool + } + + tc := []struct { + desc string + voters []multiVoter + threshold uint + }{ + { + desc: "quorum is reached with multiple votes", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true}, + }, + threshold: 2, + }, + { + desc: "quorum is not reached with disagreeing votes", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 2, + }, + { + desc: "quorum is reached with unweighted disagreeing voter", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 1, + }, + { + desc: "quorum is reached with outweighed disagreeing voter", + voters: []multiVoter{ + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true}, + {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false}, + }, + threshold: 2, + }, + } + + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t) + defer cleanup() + + ctx, cleanup := testhelper.Context() + defer cleanup() + + client := gitalypb.NewRefTransactionClient(cc) + + for _, tc := range tc { + t.Run(tc.desc, func(t *testing.T) { + var voters []transactions.Voter + + for i, voter := range tc.voters { + voters = append(voters, transactions.Voter{ + Name: fmt.Sprintf("node-%d", i), + Votes: voter.voteCount, + }) + } + + transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + require.NoError(t, err) + + var wg sync.WaitGroup + for i, v := range tc.voters { + wg.Add(1) + go func(i int, v multiVoter) { + defer wg.Done() + + for j, vote := range v.votes { + name := fmt.Sprintf("node-%d", i) + hash := sha1.Sum([]byte(vote)) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: name, + ReferenceUpdatesHash: hash[:], + }) + assert.NoError(t, err) + + if v.voteSucceeds[j] { + assert.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT") + } else { + assert.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT") + } + } + }(i, v) + } + + wg.Wait() + + results, _ := cancel() + for i, voter := range tc.voters { + require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)]) + } + }) + } +} + func TestTransactionFailures(t *testing.T) { counter, opts := setupMetrics() cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...) diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index aac836fbb..1c1c782f8 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -181,3 +181,15 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { voter.result = voteCommitted return nil } + +func (t *subtransaction) getResult(node string) (voteResult, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + voter, ok := t.votersByNode[node] + if !ok { + return voteAborted, fmt.Errorf("invalid node for transaction: %q", node) + } + + return voter.result, nil +} diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 33584fda9..024f7d5bd 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -3,12 +3,14 @@ package transactions import ( "context" "errors" + "sync" ) var ( - ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes") - ErrMissingNodes = errors.New("transaction requires at least one node") - ErrInvalidThreshold = errors.New("transaction has invalid threshold") + ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes") + ErrMissingNodes = errors.New("transaction requires at least one node") + ErrInvalidThreshold = errors.New("transaction has invalid threshold") + ErrSubtransactionFailed = errors.New("subtransaction has failed") ) // Voter is a participant in a given transaction that may cast a vote. @@ -29,9 +31,11 @@ type Voter struct { // needs to go through the same sequence and agree on the same thing in the end // in order to have the complete transaction succeed. type transaction struct { - threshold uint - voters []Voter - subtransaction *subtransaction + threshold uint + voters []Voter + + lock sync.Mutex + subtransactions []*subtransaction } func newTransaction(voters []Voter, threshold uint) (*transaction, error) { @@ -62,25 +66,86 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) { return nil, ErrInvalidThreshold } - subtransaction, err := newSubtransaction(voters, threshold) - if err != nil { - return nil, err - } - return &transaction{ - threshold: threshold, - voters: voters, - subtransaction: subtransaction, + threshold: threshold, + voters: voters, }, nil } func (t *transaction) cancel() map[string]bool { - return t.subtransaction.cancel() + t.lock.Lock() + defer t.lock.Unlock() + + results := make(map[string]bool, len(t.voters)) + + // We need to collect outcomes of all subtransactions. If any of the + // subtransactions failed, then the overall transaction failed for that + // node as well. Otherwise, if all subtransactions for the node + // succeeded, the transaction did as well. + for _, subtransaction := range t.subtransactions { + for voter, result := range subtransaction.cancel() { + // If there already is an entry indicating failure, keep it. + if didSucceed, ok := results[voter]; ok && !didSucceed { + continue + } + results[voter] = result + } + } + + return results +} + +// getOrCreateSubtransaction gets an ongoing subtransaction on which the given +// node hasn't yet voted on or creates a new one if the node has succeeded on +// all subtransactions. In case the node has failed on any of the +// subtransactions, an error will be returned. +func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { + t.lock.Lock() + defer t.lock.Unlock() + + for _, subtransaction := range t.subtransactions { + result, err := subtransaction.getResult(node) + if err != nil { + return nil, err + } + + switch result { + case voteUndecided: + // An undecided vote means we should vote on this one. + return subtransaction, nil + case voteCommitted: + // If we have committed this subtransaction, we're good + // to go. + continue + case voteAborted: + // If the subtransaction was aborted, then we need to + // fail as we cannot proceed if the path leading to the + // end result has intermittent failures. + return nil, ErrSubtransactionFailed + } + } + + // If we arrive here, then we know that all the node has voted and + // reached quorum on all subtransactions. We can thus create a new one. + subtransaction, err := newSubtransaction(t.voters, t.threshold) + if err != nil { + return nil, err + } + + t.subtransactions = append(t.subtransactions, subtransaction) + + return subtransaction, nil } func (t *transaction) vote(ctx context.Context, node string, hash []byte) error { - if err := t.subtransaction.vote(node, hash); err != nil { + subtransaction, err := t.getOrCreateSubtransaction(node) + if err != nil { return err } - return t.subtransaction.collectVotes(ctx, node) + + if err := subtransaction.vote(node, hash); err != nil { + return err + } + + return subtransaction.collectVotes(ctx, node) } |