Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-17 13:56:56 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-17 13:56:56 +0300
commit9e6f5f40e6eb44655b6acfd5dc222af04333a4f2 (patch)
tree7f0d8eb2f48cbcc6815e994df5b4bbe39112e123
parentd13f378d0015ad0888e2558e7141fbffe1b46b85 (diff)
parent9c3d907fa8f50a8ed6a87279631cc4c729b91d3f (diff)
Merge branch 'pks-subtransactions' into 'master'
Allow multiple votes per transaction Closes #2939 See merge request gitlab-org/gitaly!2386
-rw-r--r--changelogs/unreleased/pks-subtransactions.yml5
-rw-r--r--internal/praefect/transaction_test.go107
-rw-r--r--internal/praefect/transactions/manager.go6
-rw-r--r--internal/praefect/transactions/subtransaction.go198
-rw-r--r--internal/praefect/transactions/transaction.go209
5 files changed, 384 insertions, 141 deletions
diff --git a/changelogs/unreleased/pks-subtransactions.yml b/changelogs/unreleased/pks-subtransactions.yml
new file mode 100644
index 000000000..fca91046d
--- /dev/null
+++ b/changelogs/unreleased/pks-subtransactions.yml
@@ -0,0 +1,5 @@
+---
+title: Allow multiple votes per transaction
+merge_request: 2386
+author:
+type: changed
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 55882f588..12223c272 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -443,6 +444,112 @@ func TestTransactionReachesQuorum(t *testing.T) {
}
}
+func TestTransactionWithMultipleVotes(t *testing.T) {
+ type multiVoter struct {
+ voteCount uint
+ votes []string
+ voteSucceeds []bool
+ shouldSucceed bool
+ }
+
+ tc := []struct {
+ desc string
+ voters []multiVoter
+ threshold uint
+ }{
+ {
+ desc: "quorum is reached with multiple votes",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, true}, shouldSucceed: true},
+ },
+ threshold: 2,
+ },
+ {
+ desc: "quorum is not reached with disagreeing votes",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 2,
+ },
+ {
+ desc: "quorum is reached with unweighted disagreeing voter",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 0, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 1,
+ },
+ {
+ desc: "quorum is reached with outweighed disagreeing voter",
+ voters: []multiVoter{
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "bar", "qux"}, voteSucceeds: []bool{true, true, true}, shouldSucceed: true},
+ {voteCount: 1, votes: []string{"foo", "rab"}, voteSucceeds: []bool{true, false}, shouldSucceed: false},
+ },
+ threshold: 2,
+ },
+ }
+
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ for _, tc := range tc {
+ t.Run(tc.desc, func(t *testing.T) {
+ var voters []transactions.Voter
+
+ for i, voter := range tc.voters {
+ voters = append(voters, transactions.Voter{
+ Name: fmt.Sprintf("node-%d", i),
+ Votes: voter.voteCount,
+ })
+ }
+
+ transactionID, cancel, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ for i, v := range tc.voters {
+ wg.Add(1)
+ go func(i int, v multiVoter) {
+ defer wg.Done()
+
+ for j, vote := range v.votes {
+ name := fmt.Sprintf("node-%d", i)
+ hash := sha1.Sum([]byte(vote))
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: name,
+ ReferenceUpdatesHash: hash[:],
+ })
+ assert.NoError(t, err)
+
+ if v.voteSucceeds[j] {
+ assert.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT")
+ } else {
+ assert.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT")
+ }
+ }
+ }(i, v)
+ }
+
+ wg.Wait()
+
+ results, _ := cancel()
+ for i, voter := range tc.voters {
+ require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)])
+ }
+ })
+ }
+}
+
func TestTransactionFailures(t *testing.T) {
counter, opts := setupMetrics()
cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...)
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 0cb967209..252146dda 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -158,11 +158,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n
return ErrNotFound
}
- if err := transaction.vote(node, hash); err != nil {
- return err
- }
-
- if err := transaction.collectVotes(ctx, node); err != nil {
+ if err := transaction.vote(ctx, node, hash); err != nil {
return err
}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
new file mode 100644
index 000000000..9950ea53a
--- /dev/null
+++ b/internal/praefect/transactions/subtransaction.go
@@ -0,0 +1,198 @@
+package transactions
+
+import (
+ "context"
+ "crypto/sha1"
+ "errors"
+ "fmt"
+ "sync"
+)
+
+var (
+ // ErrTransactionVoteFailed indicates the transaction didn't reach quorum.
+ ErrTransactionVoteFailed = errors.New("transaction did not reach quorum")
+ // ErrTransactionCanceled indicates the transaction was canceled before
+ // reaching quorum.
+ ErrTransactionCanceled = errors.New("transaction was canceled")
+)
+
+// voteResult represents the outcome of a transaction for a single voter.
+type voteResult int
+
+const (
+ // voteUndecided means that the voter either didn't yet show up or that
+ // the vote couldn't yet be decided due to there being no majority yet.
+ voteUndecided voteResult = iota
+ // voteCommitted means that the voter committed his vote.
+ voteCommitted
+ // voteAborted means that the voter aborted his vote.
+ voteAborted
+)
+
+type vote [sha1.Size]byte
+
+func voteFromHash(hash []byte) (vote, error) {
+ var vote vote
+
+ if len(hash) != sha1.Size {
+ return vote, fmt.Errorf("invalid voting hash: %q", hash)
+ }
+
+ copy(vote[:], hash)
+ return vote, nil
+}
+
+func (v vote) isEmpty() bool {
+ return v == vote{}
+}
+
+// subtransaction is a single session where voters are voting for a certain outcome.
+type subtransaction struct {
+ doneCh chan interface{}
+ cancelCh chan interface{}
+
+ threshold uint
+
+ lock sync.RWMutex
+ votersByNode map[string]*Voter
+ voteCounts map[vote]uint
+}
+
+func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error) {
+ votersByNode := make(map[string]*Voter, len(voters))
+ for _, voter := range voters {
+ voter := voter // rescope loop variable
+ votersByNode[voter.Name] = &voter
+ }
+
+ return &subtransaction{
+ doneCh: make(chan interface{}),
+ cancelCh: make(chan interface{}),
+ threshold: threshold,
+ votersByNode: votersByNode,
+ voteCounts: make(map[vote]uint, len(voters)),
+ }, nil
+}
+
+func (t *subtransaction) cancel() map[string]bool {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ results := make(map[string]bool, len(t.votersByNode))
+ for node, voter := range t.votersByNode {
+ // If a voter didn't yet show up or is still undecided, we need
+ // to mark it as failed so it won't get the idea of committing
+ // the transaction at a later point anymore.
+ if voter.result == voteUndecided {
+ voter.result = voteAborted
+ }
+ results[node] = voter.result == voteCommitted
+ }
+
+ close(t.cancelCh)
+
+ return results
+}
+
+func (t *subtransaction) vote(node string, hash []byte) error {
+ vote, err := voteFromHash(hash)
+ if err != nil {
+ return err
+ }
+
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // Cast our vote. In case the node doesn't exist or has already cast a
+ // vote, we need to abort.
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return fmt.Errorf("invalid node for transaction: %q", node)
+ }
+ if !voter.vote.isEmpty() {
+ return fmt.Errorf("node already cast a vote: %q", node)
+ }
+ voter.vote = vote
+
+ oldCount := t.voteCounts[vote]
+ newCount := oldCount + voter.Votes
+ t.voteCounts[vote] = newCount
+
+ // If the threshold was reached before already, we mustn't try to
+ // signal the other voters again.
+ if oldCount >= t.threshold {
+ return nil
+ }
+
+ // If we've just crossed the threshold, signal all voters that the
+ // voting has concluded.
+ if newCount >= t.threshold {
+ close(t.doneCh)
+ return nil
+ }
+
+ // If any other vote has already reached the threshold, we mustn't try
+ // to notify voters again.
+ for _, count := range t.voteCounts {
+ if count >= t.threshold {
+ return nil
+ }
+ }
+
+ // If any of the voters didn't yet cast its vote, we need to wait for
+ // them.
+ for _, voter := range t.votersByNode {
+ if voter.vote.isEmpty() {
+ return nil
+ }
+ }
+
+ // Otherwise, signal voters that all votes were gathered.
+ close(t.doneCh)
+ return nil
+}
+
+func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.cancelCh:
+ return ErrTransactionCanceled
+ case <-t.doneCh:
+ break
+ }
+
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ if voter.result != voteUndecided {
+ return fmt.Errorf("voter has already settled on an outcome: %q", node)
+ }
+
+ // See if our vote crossed the threshold. As there can be only one vote
+ // exceeding it, we know we're the winner in that case.
+ if t.voteCounts[voter.vote] < t.threshold {
+ voter.result = voteAborted
+ return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold)
+ }
+
+ voter.result = voteCommitted
+ return nil
+}
+
+func (t *subtransaction) getResult(node string) (voteResult, error) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return voteAborted, fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ return voter.result, nil
+}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index ceb6b9a29..cfcca1658 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -2,31 +2,23 @@ package transactions
import (
"context"
- "crypto/sha1"
"errors"
- "fmt"
"sync"
)
var (
- ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
- ErrMissingNodes = errors.New("transaction requires at least one node")
- ErrInvalidThreshold = errors.New("transaction has invalid threshold")
- ErrTransactionVoteFailed = errors.New("transaction did not reach quorum")
- ErrTransactionCanceled = errors.New("transaction was canceled")
-)
-
-// voteResult represents the outcome of a transaction for a single voter.
-type voteResult int
-
-const (
- // voteUndecided means that the voter either didn't yet show up or that
- // the vote couldn't yet be decided due to there being no majority yet.
- voteUndecided voteResult = iota
- // voteCommitted means that the voter committed his vote.
- voteCommitted
- // voteAborted means that the voter aborted his vote.
- voteAborted
+ // ErrDuplicateNodes indicates a transaction was registered with two
+ // voters having the same name.
+ ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
+ // ErrMissingNodes indicates a transaction was registered with no voters.
+ ErrMissingNodes = errors.New("transaction requires at least one node")
+ // ErrInvalidThreshold indicates a transaction was registered with an
+ // invalid threshold that may either allow for multiple different
+ // quorums or none at all.
+ ErrInvalidThreshold = errors.New("transaction has invalid threshold")
+ // ErrSubtransactionFailed indicates a vote was cast on a
+ // subtransaction which failed already.
+ ErrSubtransactionFailed = errors.New("subtransaction has failed")
)
// Voter is a participant in a given transaction that may cast a vote.
@@ -42,32 +34,16 @@ type Voter struct {
result voteResult
}
-type vote [sha1.Size]byte
-
-func voteFromHash(hash []byte) (vote, error) {
- var vote vote
-
- if len(hash) != sha1.Size {
- return vote, fmt.Errorf("invalid voting hash: %q", hash)
- }
-
- copy(vote[:], hash)
- return vote, nil
-}
-
-func (v vote) isEmpty() bool {
- return v == vote{}
-}
-
+// transaction is a session where a set of voters votes on one or more
+// subtransactions. Subtransactions are a sequence of sessions, where each node
+// needs to go through the same sequence and agree on the same thing in the end
+// in order to have the complete transaction succeed.
type transaction struct {
- doneCh chan interface{}
- cancelCh chan interface{}
-
threshold uint
+ voters []Voter
- lock sync.RWMutex
- votersByNode map[string]*Voter
- voteCounts map[vote]uint
+ lock sync.Mutex
+ subtransactions []*subtransaction
}
func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
@@ -76,15 +52,12 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
}
var totalVotes uint
- votersByNode := make(map[string]*Voter, len(voters))
-
+ votersByNode := make(map[string]interface{}, len(voters))
for _, voter := range voters {
if _, ok := votersByNode[voter.Name]; ok {
return nil, ErrDuplicateNodes
}
-
- voter := voter // rescope loop variable
- votersByNode[voter.Name] = &voter
+ votersByNode[voter.Name] = nil
totalVotes += voter.Votes
}
@@ -102,11 +75,8 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
}
return &transaction{
- doneCh: make(chan interface{}),
- cancelCh: make(chan interface{}),
- threshold: threshold,
- votersByNode: votersByNode,
- voteCounts: make(map[vote]uint, len(votersByNode)),
+ threshold: threshold,
+ voters: voters,
}, nil
}
@@ -114,109 +84,76 @@ func (t *transaction) cancel() map[string]bool {
t.lock.Lock()
defer t.lock.Unlock()
- results := make(map[string]bool, len(t.votersByNode))
- for node, voter := range t.votersByNode {
- // If a voter didn't yet show up or is still undecided, we need
- // to mark it as failed so it won't get the idea of committing
- // the transaction at a later point anymore.
- if voter.result == voteUndecided {
- voter.result = voteAborted
+ results := make(map[string]bool, len(t.voters))
+
+ // We need to collect outcomes of all subtransactions. If any of the
+ // subtransactions failed, then the overall transaction failed for that
+ // node as well. Otherwise, if all subtransactions for the node
+ // succeeded, the transaction did as well.
+ for _, subtransaction := range t.subtransactions {
+ for voter, result := range subtransaction.cancel() {
+ // If there already is an entry indicating failure, keep it.
+ if didSucceed, ok := results[voter]; ok && !didSucceed {
+ continue
+ }
+ results[voter] = result
}
- results[node] = voter.result == voteCommitted
}
- close(t.cancelCh)
-
return results
}
-func (t *transaction) vote(node string, hash []byte) error {
- vote, err := voteFromHash(hash)
- if err != nil {
- return err
- }
-
+// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
+// node hasn't yet voted on or creates a new one if the node has succeeded on
+// all subtransactions. In case the node has failed on any of the
+// subtransactions, an error will be returned.
+func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
t.lock.Lock()
defer t.lock.Unlock()
- // Cast our vote. In case the node doesn't exist or has already cast a
- // vote, we need to abort.
- voter, ok := t.votersByNode[node]
- if !ok {
- return fmt.Errorf("invalid node for transaction: %q", node)
- }
- if !voter.vote.isEmpty() {
- return fmt.Errorf("node already cast a vote: %q", node)
- }
- voter.vote = vote
-
- oldCount := t.voteCounts[vote]
- newCount := oldCount + voter.Votes
- t.voteCounts[vote] = newCount
-
- // If the threshold was reached before already, we mustn't try to
- // signal the other voters again.
- if oldCount >= t.threshold {
- return nil
- }
-
- // If we've just crossed the threshold, signal all voters that the
- // voting has concluded.
- if newCount >= t.threshold {
- close(t.doneCh)
- return nil
- }
-
- // If any other vote has already reached the threshold, we mustn't try
- // to notify voters again.
- for _, count := range t.voteCounts {
- if count >= t.threshold {
- return nil
+ for _, subtransaction := range t.subtransactions {
+ result, err := subtransaction.getResult(node)
+ if err != nil {
+ return nil, err
}
- }
- // If any of the voters didn't yet cast its vote, we need to wait for
- // them.
- for _, voter := range t.votersByNode {
- if voter.vote.isEmpty() {
- return nil
+ switch result {
+ case voteUndecided:
+ // An undecided vote means we should vote on this one.
+ return subtransaction, nil
+ case voteCommitted:
+ // If we have committed this subtransaction, we're good
+ // to go.
+ continue
+ case voteAborted:
+ // If the subtransaction was aborted, then we need to
+ // fail as we cannot proceed if the path leading to the
+ // end result has intermittent failures.
+ return nil, ErrSubtransactionFailed
}
}
- // Otherwise, signal voters that all votes were gathered.
- close(t.doneCh)
- return nil
-}
-
-func (t *transaction) collectVotes(ctx context.Context, node string) error {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-t.cancelCh:
- return ErrTransactionCanceled
- case <-t.doneCh:
- break
+ // If we arrive here, then we know that all the node has voted and
+ // reached quorum on all subtransactions. We can thus create a new one.
+ subtransaction, err := newSubtransaction(t.voters, t.threshold)
+ if err != nil {
+ return nil, err
}
- t.lock.RLock()
- defer t.lock.RUnlock()
+ t.subtransactions = append(t.subtransactions, subtransaction)
- voter, ok := t.votersByNode[node]
- if !ok {
- return fmt.Errorf("invalid node for transaction: %q", node)
- }
+ return subtransaction, nil
+}
- if voter.result != voteUndecided {
- return fmt.Errorf("voter has already settled on an outcome: %q", node)
+func (t *transaction) vote(ctx context.Context, node string, hash []byte) error {
+ subtransaction, err := t.getOrCreateSubtransaction(node)
+ if err != nil {
+ return err
}
- // See if our vote crossed the threshold. As there can be only one vote
- // exceeding it, we know we're the winner in that case.
- if t.voteCounts[voter.vote] < t.threshold {
- voter.result = voteAborted
- return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold)
+ if err := subtransaction.vote(node, hash); err != nil {
+ return err
}
- voter.result = voteCommitted
- return nil
+ return subtransaction.collectVotes(ctx, node)
}