Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 08:56:48 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 11:26:00 +0300
commit154913666b44f186d66d97c967a1a3f06b794ea6 (patch)
treeb023b8cf93f9e513d89615d88b0e530051a7a245
parent69df3cb43559ba55acde01b8ebeb253946b9d11f (diff)
transactions: Implement multi-node transactions
Currently, transactions can only handle the trivial case of one single node voting. This was mostly done to keep initial complexity low, but in the end it is rather useless to vote when there's only one voter. This restriction is now being lifted. The implementation is quite simple: instead of storing the expected voter's name, only, we store a map of votes which is initialized to the set of expected voters. Upon receiving a vote, we will check that the voter is expected and has not cast a vote yet. If the check succeeds, we register his vote by storing his expected hash in the votes map. Concurrency is handled via two channels, `doneCh` and `cancelCh`. The latter may be executed via the cancel callback and aborts all transactions currently in the `collectVotes()` phase. `doneCh` will be closed as soon as the last voter has cast its vote and thus allow `collectVotes()` to finish. Note that it's each voter's own responsibility to count votes, which is mostly done in order to keep complexity of the code low. As - only the last voter may close a channel - we know each voter up front - no voter may cast votes more than once we know that only a single node may ever close the `doneCh`. This commit also adds a set of tests to verify code is working as expected. One of the pre-existing tests that verified that we can only ever register a single node, only, which was done as a precaution so that we actually verify this and not run into any unsupported voting setups. As we now support multiple nodes, this test is removed.
-rw-r--r--changelogs/unreleased/pks-2pc-multi-node-voting.yml5
-rw-r--r--internal/praefect/service/transaction/server.go13
-rw-r--r--internal/praefect/transaction_test.go152
-rw-r--r--internal/praefect/transactions/manager.go23
-rw-r--r--internal/praefect/transactions/transaction.go91
5 files changed, 262 insertions, 22 deletions
diff --git a/changelogs/unreleased/pks-2pc-multi-node-voting.yml b/changelogs/unreleased/pks-2pc-multi-node-voting.yml
new file mode 100644
index 000000000..795c99ed6
--- /dev/null
+++ b/changelogs/unreleased/pks-2pc-multi-node-voting.yml
@@ -0,0 +1,5 @@
+---
+title: Allow transaction manager to handle multi-node transactions
+merge_request: 2182
+author:
+type: added
diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go
index e0f0789c3..452ad3914 100644
--- a/internal/praefect/service/transaction/server.go
+++ b/internal/praefect/service/transaction/server.go
@@ -7,6 +7,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
)
type Server struct {
@@ -27,10 +28,18 @@ func NewServer(txMgr *transactions.Manager) gitalypb.RefTransactionServer {
func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) {
err := s.txMgr.VoteTransaction(ctx, in.TransactionId, in.Node, in.ReferenceUpdatesHash)
if err != nil {
- if errors.Is(err, transactions.ErrNotFound) {
+ switch {
+ case errors.Is(err, transactions.ErrNotFound):
return nil, helper.ErrNotFound(err)
+ case errors.Is(err, transactions.ErrTransactionCanceled):
+ return nil, helper.DecorateError(codes.Canceled, err)
+ case errors.Is(err, transactions.ErrTransactionVoteFailed):
+ return &gitalypb.VoteTransactionResponse{
+ State: gitalypb.VoteTransactionResponse_ABORT,
+ }, nil
+ default:
+ return nil, helper.ErrInternal(err)
}
- return nil, helper.ErrInternal(err)
}
return &gitalypb.VoteTransactionResponse{
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 95e5b3a41..c7e23555b 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -3,6 +3,7 @@ package praefect
import (
"context"
"crypto/sha1"
+ "sync"
"testing"
"time"
@@ -90,14 +91,159 @@ func TestTransactionSucceeds(t *testing.T) {
})
}
-func TestTransactionFailsWithMultipleNodes(t *testing.T) {
- _, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+func TestTransactionWithMultipleNodes(t *testing.T) {
+ testcases := []struct {
+ desc string
+ nodes []string
+ hashes [][20]byte
+ expectedState gitalypb.VoteTransactionResponse_TransactionState
+ }{
+ {
+ desc: "Nodes with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte{}),
+ sha1.Sum([]byte{}),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_COMMIT,
+ },
+ {
+ desc: "Nodes with different hashes",
+ nodes: []string{
+ "node1",
+ "node2",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("bar")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_ABORT,
+ },
+ {
+ desc: "More nodes with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ "node3",
+ "node4",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_COMMIT,
+ },
+ {
+ desc: "Majority with same hash",
+ nodes: []string{
+ "node1",
+ "node2",
+ "node3",
+ "node4",
+ },
+ hashes: [][20]byte{
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("foo")),
+ sha1.Sum([]byte("bar")),
+ sha1.Sum([]byte("foo")),
+ },
+ expectedState: gitalypb.VoteTransactionResponse_ABORT,
+ },
+ }
+
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, tc.nodes)
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ var wg sync.WaitGroup
+ for i := 0; i < len(tc.nodes); i++ {
+ wg.Add(1)
+
+ go func(idx int) {
+ defer wg.Done()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: tc.nodes[idx],
+ ReferenceUpdatesHash: tc.hashes[idx][:],
+ })
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedState, response.State)
+ }(i)
+ }
+
+ wg.Wait()
+ })
+ }
+}
+
+func TestTransactionWithContextCancellation(t *testing.T) {
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t)
+ defer cleanup()
+
+ client := gitalypb.NewRefTransactionClient(cc)
+
+ ctx, cancel := testhelper.Context()
+
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []string{"voter", "absent"})
+ require.NoError(t, err)
+ defer cancelTransaction()
+
+ hash := sha1.Sum([]byte{})
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: "voter",
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.Error(t, err)
+ require.Equal(t, codes.Canceled, status.Code(err))
+ }()
+
+ cancel()
+ wg.Wait()
+}
+
+func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) {
+ ctx, cleanup := testhelper.Context()
defer cleanup()
+ txMgr := transactions.NewManager()
+
+ _, _, err := txMgr.RegisterTransaction(ctx, []string{})
+ require.Equal(t, transactions.ErrMissingNodes, err)
+
+ _, _, err = txMgr.RegisterTransaction(ctx, []string{"node1", "node2", "node1"})
+ require.Equal(t, transactions.ErrDuplicateNodes, err)
+}
+
+func TestTransactionRegistrationWithSameNodeFails(t *testing.T) {
ctx, cleanup := testhelper.Context()
defer cleanup()
- _, _, err := txMgr.RegisterTransaction(ctx, []string{"node1", "node2"})
+ txMgr := transactions.NewManager()
+
+ _, _, err := txMgr.RegisterTransaction(ctx, []string{"foo", "bar", "foo"})
require.Error(t, err)
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index d9a5a354f..b5059c01f 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -135,17 +135,18 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui
mgr.counterMetric.WithLabelValues("registered").Inc()
return transactionID, func() {
- mgr.cancelTransaction(transactionID)
+ mgr.cancelTransaction(transactionID, transaction)
}, nil
}
-func (mgr *Manager) cancelTransaction(transactionID uint64) {
+func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
delete(mgr.transactions, transactionID)
+ transaction.cancel()
}
-func (mgr *Manager) voteTransaction(transactionID uint64, node string, hash []byte) error {
+func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
mgr.lock.Lock()
transaction, ok := mgr.transactions[transactionID]
mgr.lock.Unlock()
@@ -158,6 +159,10 @@ func (mgr *Manager) voteTransaction(transactionID uint64, node string, hash []by
return err
}
+ if err := transaction.collectVotes(ctx); err != nil {
+ return err
+ }
+
return nil
}
@@ -182,13 +187,19 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
"hash": hex.EncodeToString(hash),
}).Debug("VoteTransaction")
- if err := mgr.voteTransaction(transactionID, node, hash); err != nil {
+ if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil {
mgr.log(ctx).WithFields(logrus.Fields{
"transaction_id": transactionID,
"node": node,
"hash": hex.EncodeToString(hash),
- }).WithError(err).Error("VoteTransaction: transaction invalid")
- mgr.counterMetric.WithLabelValues("invalid").Inc()
+ }).WithError(err).Error("VoteTransaction: vote failed")
+
+ if errors.Is(err, ErrTransactionVoteFailed) {
+ mgr.counterMetric.WithLabelValues("aborted").Inc()
+ } else {
+ mgr.counterMetric.WithLabelValues("invalid").Inc()
+ }
+
return err
}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 431ac2f11..4c3d93a14 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -1,39 +1,108 @@
package transactions
import (
+ "bytes"
+ "context"
"crypto/sha1"
"errors"
"fmt"
+ "sync"
+)
+
+var (
+ ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
+ ErrMissingNodes = errors.New("transaction requires at least one node")
+ ErrTransactionVoteFailed = errors.New("transaction vote failed")
+ ErrTransactionCanceled = errors.New("transaction was canceled")
)
type transaction struct {
- node string
+ doneCh chan interface{}
+ cancelCh chan interface{}
+
+ lock sync.Mutex
+ votes map[string][]byte
}
func newTransaction(nodes []string) (*transaction, error) {
- // We only accept a single node in transactions right now, which is
- // usually the primary. This limitation will be lifted at a later point
- // to allow for real transaction voting and multi-phase commits.
- if len(nodes) != 1 {
- return nil, errors.New("transaction requires exactly one node")
+ if len(nodes) == 0 {
+ return nil, ErrMissingNodes
+ }
+
+ votes := make(map[string][]byte, len(nodes))
+ for _, node := range nodes {
+ if _, ok := votes[node]; ok {
+ return nil, ErrDuplicateNodes
+ }
+ votes[node] = nil
}
return &transaction{
- node: nodes[0],
+ doneCh: make(chan interface{}),
+ cancelCh: make(chan interface{}),
+ votes: votes,
}, nil
}
+func (t *transaction) cancel() {
+ close(t.cancelCh)
+}
+
func (t *transaction) vote(node string, hash []byte) error {
- // While the reference updates hash is not used yet, we already verify
- // it's there. At a later point, the hash will be used to verify that
- // all voting nodes agree on the same updates.
if len(hash) != sha1.Size {
return fmt.Errorf("invalid reference hash: %q", hash)
}
- if t.node != node {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // Cast our vote. In case the node doesn't exist or has already cast a
+ // vote, we need to abort.
+ vote, ok := t.votes[node]
+ if !ok {
return fmt.Errorf("invalid node for transaction: %q", node)
}
+ if vote != nil {
+ return fmt.Errorf("node already cast a note: %q", node)
+ }
+ t.votes[node] = hash
+
+ // Count votes to see if we're done. If there are no more votes, then
+ // we must notify other voters (and ourselves) by closing the `done`
+ // channel.
+ for _, vote := range t.votes {
+ if vote == nil {
+ return nil
+ }
+ }
+
+ // As only the last voter may see that all participants have cast their
+ // vote, this can really only be called by a single goroutine.
+ close(t.doneCh)
+
+ return nil
+}
+
+func (t *transaction) collectVotes(ctx context.Context) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.cancelCh:
+ return ErrTransactionCanceled
+ case <-t.doneCh:
+ break
+ }
+
+ // Count votes to see whether we reached agreement or not. There should
+ // be no need to lock as nobody will modify the votes anymore.
+ var firstVote []byte
+ for _, vote := range t.votes {
+ if firstVote == nil {
+ firstVote = vote
+ } else if !bytes.Equal(firstVote, vote) {
+ return ErrTransactionVoteFailed
+ }
+ }
return nil
}