Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-15 15:21:24 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-07-17 13:22:51 +0300
commitdb4d06b54f7d319d1b7408ec2f7c38e154bc29d8 (patch)
treeecf4e0c1386f4321202047c452cac65440e95b67
parent31a0f591e965d3b2d52fe139f7e7d30d7474f504 (diff)
transaction: Allow multiple votes per transaction
Right now, we're using the pre-receive hook to call into transactions on the Gitaly nodes. This hook is well-understood to be executed once and only once per action, and as a result the current implementation of transactions allowed for a single vote via this pre-receive hook, only. The pre-receive hook was a stop-gap implementation of the real mechanism we wanted to eventually use, though, which is the reference-transaction hook that's going to be released as part of git-core v2.28. While the input to both hooks is the same and thus no changes to the actual voting logic should be required, the most important difference is that the reference-transaction hook may be invoked arbitrarily many times for each Git command. E.g. a non-atomic push will execute the hook as many times as there are updated references. This surfaces a current design limitation of the transaction mechanism as it is implemented in Gitaly: as a transaction only allows for a single vote per node, it's inherently incompatible with the semantics introduce by the reference-transaction hook. This is why we now extend the transaction mechanism to allow a sequence of votes instead by introducing subtransactions. Subtransactions encapsulate all the voting logic that was previously part of the transaction, which is casting the vote and collecting the votes to establish whether quorum was reached. Transactions now start to be a wrapper of a set of transactions, which transparantly creates new subtransactions as required whenever a node casts a vote. Whenever a node casts a vote, one of three things may now happen: 1. A subtransaction exists where the node's status is "aborted". This means that any one of the subtransactions failed and thus the complete transaction needs to be labelled as "aborted" for the node. It will receive an error and is not allowed to cast any votes anymore. 2. A subtransaction exists where the node's state is "undecided". As this means that the node simply didn't cast a vote, it will cast its vote for the oldest undecided subtransaction and wait for quorum to be reached. 3. All existing subtransactions are in "committed" state for the given node. We'll thus create a new subtransaction, cast our vote for this new transaction and wait for quorum to be reached. To evaluate the complete transaction's outcome, we need to establish whether for a given node, all created subtransactions are in state "committed". Only if that's the case will the transaction be treated as successful for the node. With this logic, we're now able to transparently allow a voter to cast a sequence of votes.
-rw-r--r--changelogs/unreleased/pks-subtransactions.yml5
-rw-r--r--internal/praefect/transaction_test.go107
-rw-r--r--internal/praefect/transactions/subtransaction.go12
-rw-r--r--internal/praefect/transactions/transaction.go99
4 files changed, 206 insertions, 17 deletions
diff --git a/changelogs/unreleased/pks-subtransactions.yml b/changelogs/unreleased/pks-subtransactions.yml
new file mode 100644
index 000000000..fca91046d
--- /dev/null
+++ b/changelogs/unreleased/pks-subtransactions.yml
@@ -0,0 +1,5 @@
+---
+title: Allow multiple votes per transaction
+merge_request: 2386
+author:
+type: changed
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 55882f588..12223c272 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -443,6 +444,112 @@ func TestTransactionReachesQuorum(t *testing.T) {
}
}
+func TestTransactionWithMultipleVotes(t *testing.T) {
+ type multiVoter struct {
+ voteCount uint
+ votes []string
+ voteSucceeds []bool
+ shouldSucceed bool
+ }
+
+ tc := []struct {
+ desc string
+ voters []multiVoter
+ threshold uint
+ }{
+ {
+ desc: "quorum is reached with multiple votes",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
+ },
+ threshold: 2,
+ },
+ {
+ desc: "quorum is not reached with disagreeing votes",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 2,
+ },
+ {
+ desc: "quorum is reached with unweighted disagreeing voter",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 1,
+ },
+ {
+ desc: "quorum is reached with outweighed disagreeing voter",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 2,
+ },
+ }
+
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ for _, tc := range tc {
+ t.Run(tc.desc, func(t *testing.T) {
+ var voters []transactions.Voter
+
+ for i, voter := range tc.voters {
+ voters = append(voters, transactions.Voter{
+ Name: fmt.Sprintf("node-%d", i),
+ Votes: voter.voteCount,
+ })
+ }
+
+ transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ for i, v := range tc.voters {
+ wg.Add(1)
+ go func(i int, v multiVoter) {
+ defer wg.Done()
+
+ for j, vote := range v.votes {
+ name := fmt.Sprintf("node-%d", i)
+ hash := sha1.Sum([]byte(vote))
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: name,
+ ReferenceUpdatesHash: hash[:],
+ })
+ assert.NoError(t, err)
+
+ if v.voteSucceeds[j] {
+ assert.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT")
+ } else {
+ assert.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT")
+ }
+ }
+ }(i, v)
+ }
+
+ wg.Wait()
+
+ results, _ := cancel()
+ for i, voter := range tc.voters {
+ require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)])
+ }
+ })
+ }
+}
+
func TestTransactionFailures(t *testing.T) {
counter, opts := setupMetrics()
cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...)
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index aac836fbb..1c1c782f8 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -181,3 +181,15 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
voter.result = voteCommitted
return nil
}
+
+func (t *subtransaction) getResult(node string) (voteResult, error) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return voteAborted, fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ return voter.result, nil
+}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 33584fda9..024f7d5bd 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -3,12 +3,14 @@ package transactions
import (
"context"
"errors"
+ "sync"
)
var (
- ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
- ErrMissingNodes = errors.New("transaction requires at least one node")
- ErrInvalidThreshold = errors.New("transaction has invalid threshold")
+ ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
+ ErrMissingNodes = errors.New("transaction requires at least one node")
+ ErrInvalidThreshold = errors.New("transaction has invalid threshold")
+ ErrSubtransactionFailed = errors.New("subtransaction has failed")
)
// Voter is a participant in a given transaction that may cast a vote.
@@ -29,9 +31,11 @@ type Voter struct {
// needs to go through the same sequence and agree on the same thing in the end
// in order to have the complete transaction succeed.
type transaction struct {
- threshold uint
- voters []Voter
- subtransaction *subtransaction
+ threshold uint
+ voters []Voter
+
+ lock sync.Mutex
+ subtransactions []*subtransaction
}
func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
@@ -62,25 +66,86 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
return nil, ErrInvalidThreshold
}
- subtransaction, err := newSubtransaction(voters, threshold)
- if err != nil {
- return nil, err
- }
-
return &transaction{
- threshold: threshold,
- voters: voters,
- subtransaction: subtransaction,
+ threshold: threshold,
+ voters: voters,
}, nil
}
func (t *transaction) cancel() map[string]bool {
- return t.subtransaction.cancel()
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ results := make(map[string]bool, len(t.voters))
+
+ // We need to collect outcomes of all subtransactions. If any of the
+ // subtransactions failed, then the overall transaction failed for that
+ // node as well. Otherwise, if all subtransactions for the node
+ // succeeded, the transaction did as well.
+ for _, subtransaction := range t.subtransactions {
+ for voter, result := range subtransaction.cancel() {
+ // If there already is an entry indicating failure, keep it.
+ if didSucceed, ok := results[voter]; ok && !didSucceed {
+ continue
+ }
+ results[voter] = result
+ }
+ }
+
+ return results
+}
+
+// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
+// node hasn't yet voted on or creates a new one if the node has succeeded on
+// all subtransactions. In case the node has failed on any of the
+// subtransactions, an error will be returned.
+func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ for _, subtransaction := range t.subtransactions {
+ result, err := subtransaction.getResult(node)
+ if err != nil {
+ return nil, err
+ }
+
+ switch result {
+ case voteUndecided:
+ // An undecided vote means we should vote on this one.
+ return subtransaction, nil
+ case voteCommitted:
+ // If we have committed this subtransaction, we're good
+ // to go.
+ continue
+ case voteAborted:
+ // If the subtransaction was aborted, then we need to
+ // fail as we cannot proceed if the path leading to the
+ // end result has intermittent failures.
+ return nil, ErrSubtransactionFailed
+ }
+ }
+
+ // If we arrive here, then we know that all the node has voted and
+ // reached quorum on all subtransactions. We can thus create a new one.
+ subtransaction, err := newSubtransaction(t.voters, t.threshold)
+ if err != nil {
+ return nil, err
+ }
+
+ t.subtransactions = append(t.subtransactions, subtransaction)
+
+ return subtransaction, nil
}
func (t *transaction) vote(ctx context.Context, node string, hash []byte) error {
- if err := t.subtransaction.vote(node, hash); err != nil {
+ subtransaction, err := t.getOrCreateSubtransaction(node)
+ if err != nil {
return err
}
- return t.subtransaction.collectVotes(ctx, node)
+
+ if err := subtransaction.vote(node, hash); err != nil {
+ return err
+ }
+
+ return subtransaction.collectVotes(ctx, node)
}