Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <668120-johncai@users.noreply.gitlab.com>2020-07-14 23:53:46 +0300
committerJohn Cai <668120-johncai@users.noreply.gitlab.com>2020-07-14 23:53:46 +0300
commitada5ed557ce191e09909a3cda4b53717d26a21b0 (patch)
tree0d944afcb4841680bfd17ebcedac7e8a604cc673
parentfdd1fe70085c1a20b10553680d88a967a4cfbfae (diff)
parent1a995685b1f58af08f0a1ddcb48bd66862d6987e (diff)
Merge branch 'pks-transactions-replication' into 'master'
Schedule replication jobs for failed transaction voters Closes #2880 See merge request gitlab-org/gitaly!2355
-rw-r--r--changelogs/unreleased/pks-transactions-replication.yml5
-rw-r--r--internal/praefect/coordinator.go34
-rw-r--r--internal/praefect/coordinator_test.go155
-rw-r--r--internal/praefect/server_test.go20
-rw-r--r--internal/praefect/transaction_test.go138
-rw-r--r--internal/praefect/transactions/manager.go14
-rw-r--r--internal/praefect/transactions/transaction.go40
7 files changed, 362 insertions, 44 deletions
diff --git a/changelogs/unreleased/pks-transactions-replication.yml b/changelogs/unreleased/pks-transactions-replication.yml
new file mode 100644
index 000000000..1a66a320f
--- /dev/null
+++ b/changelogs/unreleased/pks-transactions-replication.yml
@@ -0,0 +1,5 @@
+---
+title: Schedule replication jobs for failed transaction voters
+merge_request: 2355
+author:
+type: added
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 875f26f21..22aa48702 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -235,7 +235,39 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
if err != nil {
return nil, fmt.Errorf("registering transactions: %w", err)
}
- finalizers = append(finalizers, transactionCleanup)
+ finalizers = append(finalizers, func() error {
+ successByNode, err := transactionCleanup()
+ if err != nil {
+ return err
+ }
+
+ // If the primary node failed the transaction, then
+ // there's no sense in trying to replicate from primary
+ // to secondaries.
+ if !successByNode[shard.Primary.GetStorage()] {
+ return fmt.Errorf("transaction: primary failed vote")
+ }
+
+ failedNodes := make([]nodes.Node, 0, len(successByNode))
+ for node, success := range successByNode {
+ if success {
+ continue
+ }
+
+ secondary, err := shard.GetNode(node)
+ if err != nil {
+ return err
+ }
+
+ failedNodes = append(failedNodes, secondary)
+ }
+
+ if len(failedNodes) == 0 {
+ return nil
+ }
+
+ return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)()
+ })
injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true)
if err != nil {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 17b8c59bc..f1557dd41 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -2,6 +2,7 @@ package praefect
import (
"context"
+ "crypto/sha1"
"errors"
"fmt"
"io/ioutil"
@@ -219,6 +220,160 @@ func TestStreamDirectorMutator(t *testing.T) {
require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
}
+func TestStreamDirectorMutator_Transaction(t *testing.T) {
+ type node struct {
+ primary bool
+ vote string
+ shouldSucceed bool
+ shouldGetRepl bool
+ }
+
+ testcases := []struct {
+ desc string
+ nodes []node
+ }{
+ {
+ desc: "successful vote should not create replication jobs",
+ nodes: []node{
+ {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false},
+ {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false},
+ {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false},
+ },
+ },
+ {
+ // Currently, transactions are created such that all nodes need to agree.
+ // This is going to change in the future, but for now let's just test that
+ // we don't get any replication jobs if any node disagrees.
+ desc: "failing vote should not create replication jobs",
+ nodes: []node{
+ {primary: true, vote: "foobar", shouldSucceed: false, shouldGetRepl: false},
+ {primary: false, vote: "foobar", shouldSucceed: false, shouldGetRepl: false},
+ {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: false},
+ },
+ },
+ }
+
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ storageNodes := make([]*config.Node, 0, len(tc.nodes))
+ for i, node := range tc.nodes {
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ server, _ := testhelper.NewServerWithHealth(t, socket)
+ defer server.Stop()
+ node := &config.Node{Address: "unix://" + socket, Storage: fmt.Sprintf("node-%d", i), DefaultPrimary: node.primary}
+ storageNodes = append(storageNodes, node)
+ }
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: storageNodes,
+ },
+ },
+ }
+
+ var replicationWaitGroup sync.WaitGroup
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ defer replicationWaitGroup.Done()
+ return queue.Enqueue(ctx, event)
+ })
+
+ repo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.ReferenceTransactions)
+
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ require.NoError(t, err)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ for i := range tc.nodes {
+ node, err := shard.GetNode(fmt.Sprintf("node-%d", i))
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, node, true)
+ }
+
+ txMgr := transactions.NewManager()
+
+ coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+
+ fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
+
+ frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{
+ Repository: &repo,
+ })
+ require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+
+ transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx)
+ require.NoError(t, err)
+
+ var voterWaitGroup sync.WaitGroup
+ for i, node := range tc.nodes {
+ voterWaitGroup.Add(1)
+
+ i := i
+ node := node
+
+ go func() {
+ defer voterWaitGroup.Done()
+
+ if node.shouldGetRepl {
+ replicationWaitGroup.Add(1)
+ }
+
+ vote := sha1.Sum([]byte(node.vote))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote[:])
+ if node.shouldSucceed {
+ require.NoError(t, err)
+ } else {
+ require.True(t, errors.Is(err, transactions.ErrTransactionVoteFailed))
+ }
+ }()
+ }
+ voterWaitGroup.Wait()
+
+ // this call creates new events in the queue and simulates usual flow of the update operation
+ var primaryShouldSucceed bool
+ for _, node := range tc.nodes {
+ if !node.primary {
+ continue
+ }
+ primaryShouldSucceed = node.shouldSucceed
+ }
+ err = streamParams.RequestFinalizer()
+ if primaryShouldSucceed {
+ require.NoError(t, err)
+ } else {
+ require.Error(t, err, errors.New("transaction: primary failed vote"))
+ }
+
+ replicationWaitGroup.Wait()
+
+ for i, node := range tc.nodes {
+ events, err := queueInterceptor.Dequeue(ctx, "praefect", fmt.Sprintf("node-%d", i), 10)
+ require.NoError(t, err)
+ if node.shouldGetRepl {
+ require.Len(t, events, 1)
+ } else {
+ require.Empty(t, events)
+ }
+ }
+ })
+ }
+}
+
func TestStreamDirectorAccessor(t *testing.T) {
gitalySocket := testhelper.GetTemporaryGitalySocketFileName()
srv, _ := testhelper.NewServerWithHealth(t, gitalySocket)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 8f98d44c5..f70e481fa 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -3,6 +3,7 @@ package praefect
import (
"bytes"
"context"
+ "crypto/sha1"
"errors"
"io"
"io/ioutil"
@@ -27,6 +28,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -648,6 +650,7 @@ func TestRepoRename(t *testing.T) {
}
type mockSmartHTTP struct {
+ txMgr *transactions.Manager
m sync.Mutex
methodsCalled map[string]int
}
@@ -716,6 +719,18 @@ func (m *mockSmartHTTP) PostReceivePack(stream gitalypb.SmartHTTPService_PostRec
}
}
+ ctx := stream.Context()
+
+ tx, err := metadata.TransactionFromContext(ctx)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ hash := sha1.Sum([]byte{})
+ if err := m.txMgr.VoteTransaction(ctx, tx.ID, tx.Node, hash[:]); err != nil {
+ return helper.ErrInternal(err)
+ }
+
return nil
}
@@ -737,7 +752,9 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *
}
func TestProxyWrites(t *testing.T) {
- smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{}, &mockSmartHTTP{}, &mockSmartHTTP{}
+ txMgr := transactions.NewManager()
+
+ smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}
socket0, srv0 := newGrpcServer(t, smartHTTP0)
defer srv0.Stop()
@@ -774,7 +791,6 @@ func TestProxyWrites(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
- txMgr := transactions.NewManager()
coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index 3d3b15b0d..ed9f8ed77 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -19,6 +19,13 @@ import (
"google.golang.org/grpc/status"
)
+type voter struct {
+ votes uint
+ vote string
+ showsUp bool
+ shouldSucceed bool
+}
+
func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
txMgr := transactions.NewManager(opts...)
@@ -310,13 +317,6 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) {
}
func TestTransactionReachesQuorum(t *testing.T) {
- type voter struct {
- votes uint
- vote string
- showsUp bool
- shouldSucceed bool
- }
-
tc := []struct {
desc string
voters []voter
@@ -469,35 +469,109 @@ func TestTransactionFailures(t *testing.T) {
}
func TestTransactionCancellation(t *testing.T) {
- counter, opts := setupMetrics()
- cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
- defer cleanup()
+ testcases := []struct {
+ desc string
+ voters []voter
+ threshold uint
+ expectedMetrics counterMetrics
+ }{
+ {
+ desc: "single node cancellation",
+ voters: []voter{
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 1,
+ expectedMetrics: counterMetrics{registered: 1, committed: 0},
+ },
+ {
+ desc: "two nodes failing to show up",
+ voters: []voter{
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 2,
+ expectedMetrics: counterMetrics{registered: 1, committed: 0},
+ },
+ {
+ desc: "two nodes with unweighted node failing",
+ voters: []voter{
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 0, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 1,
+ expectedMetrics: counterMetrics{registered: 1, started: 1, committed: 1},
+ },
+ {
+ desc: "multiple weighted votes with subset failing",
+ voters: []voter{
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 1, showsUp: true, shouldSucceed: true},
+ {votes: 1, showsUp: false, shouldSucceed: false},
+ },
+ threshold: 2,
+ expectedMetrics: counterMetrics{registered: 1, started: 2, committed: 2},
+ },
+ }
- ctx, cancel := context.WithTimeout(context.Background(), time.Second)
- defer cancel()
+ for _, tc := range testcases {
+ t.Run(tc.desc, func(t *testing.T) {
+ counter, opts := setupMetrics()
+ cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...)
+ defer cleanup()
- client := gitalypb.NewRefTransactionClient(cc)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
- transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{
- {Name: "node1", Votes: 1},
- }, 1)
- require.NoError(t, err)
- require.NotZero(t, transactionID)
+ client := gitalypb.NewRefTransactionClient(cc)
- cancelTransaction()
+ voters := make([]transactions.Voter, 0, len(tc.voters))
+ for i, voter := range tc.voters {
+ voters = append(voters, transactions.Voter{
+ Name: fmt.Sprintf("node-%d", i),
+ Votes: voter.votes,
+ })
+ }
- hash := sha1.Sum([]byte{})
- _, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: transactionID,
- Node: "node1",
- ReferenceUpdatesHash: hash[:],
- })
- require.Error(t, err)
- require.Equal(t, codes.NotFound, status.Code(err))
+ transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold)
+ require.NoError(t, err)
- verifyCounterMetrics(t, counter, counterMetrics{
- registered: 1,
- started: 1,
- invalid: 1,
- })
+ var wg sync.WaitGroup
+ for i, v := range tc.voters {
+ if !v.showsUp {
+ continue
+ }
+
+ wg.Add(1)
+ go func(i int, v voter) {
+ defer wg.Done()
+
+ name := fmt.Sprintf("node-%d", i)
+ hash := sha1.Sum([]byte(v.vote))
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: transactionID,
+ Node: name,
+ ReferenceUpdatesHash: hash[:],
+ })
+ require.NoError(t, err)
+
+ if v.shouldSucceed {
+ require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT")
+ } else {
+ require.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT")
+ }
+ }(i, v)
+ }
+ wg.Wait()
+
+ results, err := cancelTransaction()
+ require.NoError(t, err)
+
+ for i, v := range tc.voters {
+ require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state")
+ }
+
+ verifyCounterMetrics(t, counter, tc.expectedMetrics)
+ })
+ }
}
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 9f7b5f8ae..d0319d5c6 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -102,8 +102,9 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger {
// CancelFunc is the transaction cancellation function returned by
// `RegisterTransaction`. Calling it will cause the transaction to be removed
-// from the transaction manager.
-type CancelFunc func() error
+// from the transaction manager. It returns the outcome for each node: `true`
+// if the node committed the transaction, `false` if it aborted.
+type CancelFunc func() (map[string]bool, error)
// RegisterTransaction registers a new reference transaction for a set of nodes
// taking part in the transaction. `threshold` is the threshold at which an
@@ -136,17 +137,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
mgr.counterMetric.WithLabelValues("registered").Inc()
- return transactionID, func() error {
- mgr.cancelTransaction(transactionID, transaction)
- return nil
+ return transactionID, func() (map[string]bool, error) {
+ return mgr.cancelTransaction(transactionID, transaction)
}, nil
}
-func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) {
+func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
delete(mgr.transactions, transactionID)
- transaction.cancel()
+ return transaction.cancel(), nil
}
func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 13282d950..ceb6b9a29 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -16,6 +16,19 @@ var (
ErrTransactionCanceled = errors.New("transaction was canceled")
)
+// voteResult represents the outcome of a transaction for a single voter.
+type voteResult int
+
+const (
+ // voteUndecided means that the voter either didn't yet show up or that
+ // the vote couldn't yet be decided due to there being no majority yet.
+ voteUndecided voteResult = iota
+ // voteCommitted means that the voter committed his vote.
+ voteCommitted
+ // voteAborted means that the voter aborted his vote.
+ voteAborted
+)
+
// Voter is a participant in a given transaction that may cast a vote.
type Voter struct {
// Name of the voter, usually Gitaly's storage name.
@@ -25,7 +38,8 @@ type Voter struct {
// this voter.
Votes uint
- vote vote
+ vote vote
+ result voteResult
}
type vote [sha1.Size]byte
@@ -96,8 +110,24 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) {
}, nil
}
-func (t *transaction) cancel() {
+func (t *transaction) cancel() map[string]bool {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ results := make(map[string]bool, len(t.votersByNode))
+ for node, voter := range t.votersByNode {
+ // If a voter didn't yet show up or is still undecided, we need
+ // to mark it as failed so it won't get the idea of committing
+ // the transaction at a later point anymore.
+ if voter.result == voteUndecided {
+ voter.result = voteAborted
+ }
+ results[node] = voter.result == voteCommitted
+ }
+
close(t.cancelCh)
+
+ return results
}
func (t *transaction) vote(node string, hash []byte) error {
@@ -176,11 +206,17 @@ func (t *transaction) collectVotes(ctx context.Context, node string) error {
return fmt.Errorf("invalid node for transaction: %q", node)
}
+ if voter.result != voteUndecided {
+ return fmt.Errorf("voter has already settled on an outcome: %q", node)
+ }
+
// See if our vote crossed the threshold. As there can be only one vote
// exceeding it, we know we're the winner in that case.
if t.voteCounts[voter.vote] < t.threshold {
+ voter.result = voteAborted
return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold)
}
+ voter.result = voteCommitted
return nil
}