diff options
author | John Cai <668120-johncai@users.noreply.gitlab.com> | 2020-07-14 23:53:46 +0300 |
---|---|---|
committer | John Cai <668120-johncai@users.noreply.gitlab.com> | 2020-07-14 23:53:46 +0300 |
commit | ada5ed557ce191e09909a3cda4b53717d26a21b0 (patch) | |
tree | 0d944afcb4841680bfd17ebcedac7e8a604cc673 | |
parent | fdd1fe70085c1a20b10553680d88a967a4cfbfae (diff) | |
parent | 1a995685b1f58af08f0a1ddcb48bd66862d6987e (diff) |
Merge branch 'pks-transactions-replication' into 'master'
Schedule replication jobs for failed transaction voters
Closes #2880
See merge request gitlab-org/gitaly!2355
-rw-r--r-- | changelogs/unreleased/pks-transactions-replication.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 34 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 155 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 20 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 138 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 14 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 40 |
7 files changed, 362 insertions, 44 deletions
diff --git a/changelogs/unreleased/pks-transactions-replication.yml b/changelogs/unreleased/pks-transactions-replication.yml new file mode 100644 index 000000000..1a66a320f --- /dev/null +++ b/changelogs/unreleased/pks-transactions-replication.yml @@ -0,0 +1,5 @@ +--- +title: Schedule replication jobs for failed transaction voters +merge_request: 2355 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 875f26f21..22aa48702 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -235,7 +235,39 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall if err != nil { return nil, fmt.Errorf("registering transactions: %w", err) } - finalizers = append(finalizers, transactionCleanup) + finalizers = append(finalizers, func() error { + successByNode, err := transactionCleanup() + if err != nil { + return err + } + + // If the primary node failed the transaction, then + // there's no sense in trying to replicate from primary + // to secondaries. + if !successByNode[shard.Primary.GetStorage()] { + return fmt.Errorf("transaction: primary failed vote") + } + + failedNodes := make([]nodes.Node, 0, len(successByNode)) + for node, success := range successByNode { + if success { + continue + } + + secondary, err := shard.GetNode(node) + if err != nil { + return err + } + + failedNodes = append(failedNodes, secondary) + } + + if len(failedNodes) == 0 { + return nil + } + + return c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, failedNodes, change, params)() + }) injectedCtx, err := metadata.InjectTransaction(ctx, transactionID, shard.Primary.GetStorage(), true) if err != nil { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 17b8c59bc..f1557dd41 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -2,6 +2,7 @@ package praefect import ( "context" + "crypto/sha1" "errors" "fmt" "io/ioutil" @@ -219,6 +220,160 @@ func TestStreamDirectorMutator(t *testing.T) { require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") } +func TestStreamDirectorMutator_Transaction(t *testing.T) { + type node struct { + primary bool + vote string + shouldSucceed bool + shouldGetRepl bool + } + + testcases := []struct { + desc string + nodes []node + }{ + { + desc: "successful vote should not create replication jobs", + nodes: []node{ + {primary: true, vote: "foobar", shouldSucceed: true, shouldGetRepl: false}, + {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false}, + {primary: false, vote: "foobar", shouldSucceed: true, shouldGetRepl: false}, + }, + }, + { + // Currently, transactions are created such that all nodes need to agree. + // This is going to change in the future, but for now let's just test that + // we don't get any replication jobs if any node disagrees. + desc: "failing vote should not create replication jobs", + nodes: []node{ + {primary: true, vote: "foobar", shouldSucceed: false, shouldGetRepl: false}, + {primary: false, vote: "foobar", shouldSucceed: false, shouldGetRepl: false}, + {primary: false, vote: "barfoo", shouldSucceed: false, shouldGetRepl: false}, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + storageNodes := make([]*config.Node, 0, len(tc.nodes)) + for i, node := range tc.nodes { + socket := testhelper.GetTemporaryGitalySocketFileName() + server, _ := testhelper.NewServerWithHealth(t, socket) + defer server.Stop() + node := &config.Node{Address: "unix://" + socket, Storage: fmt.Sprintf("node-%d", i), DefaultPrimary: node.primary} + storageNodes = append(storageNodes, node) + } + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: storageNodes, + }, + }, + } + + var replicationWaitGroup sync.WaitGroup + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + defer replicationWaitGroup.Done() + return queue.Enqueue(ctx, event) + }) + + repo := gitalypb.Repository{ + StorageName: "praefect", + RelativePath: "/path/to/hashed/storage", + } + + ctx, cancel := testhelper.Context() + defer cancel() + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.ReferenceTransactions) + + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) + require.NoError(t, err) + + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) + require.NoError(t, err) + + for i := range tc.nodes { + node, err := shard.GetNode(fmt.Sprintf("node-%d", i)) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, node, true) + } + + txMgr := transactions.NewManager() + + coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + + fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" + + frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{ + Repository: &repo, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var voterWaitGroup sync.WaitGroup + for i, node := range tc.nodes { + voterWaitGroup.Add(1) + + i := i + node := node + + go func() { + defer voterWaitGroup.Done() + + if node.shouldGetRepl { + replicationWaitGroup.Add(1) + } + + vote := sha1.Sum([]byte(node.vote)) + err := txMgr.VoteTransaction(ctx, transaction.ID, fmt.Sprintf("node-%d", i), vote[:]) + if node.shouldSucceed { + require.NoError(t, err) + } else { + require.True(t, errors.Is(err, transactions.ErrTransactionVoteFailed)) + } + }() + } + voterWaitGroup.Wait() + + // this call creates new events in the queue and simulates usual flow of the update operation + var primaryShouldSucceed bool + for _, node := range tc.nodes { + if !node.primary { + continue + } + primaryShouldSucceed = node.shouldSucceed + } + err = streamParams.RequestFinalizer() + if primaryShouldSucceed { + require.NoError(t, err) + } else { + require.Error(t, err, errors.New("transaction: primary failed vote")) + } + + replicationWaitGroup.Wait() + + for i, node := range tc.nodes { + events, err := queueInterceptor.Dequeue(ctx, "praefect", fmt.Sprintf("node-%d", i), 10) + require.NoError(t, err) + if node.shouldGetRepl { + require.Len(t, events, 1) + } else { + require.Empty(t, events) + } + } + }) + } +} + func TestStreamDirectorAccessor(t *testing.T) { gitalySocket := testhelper.GetTemporaryGitalySocketFileName() srv, _ := testhelper.NewServerWithHealth(t, gitalySocket) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 8f98d44c5..f70e481fa 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -3,6 +3,7 @@ package praefect import ( "bytes" "context" + "crypto/sha1" "errors" "io" "io/ioutil" @@ -27,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -648,6 +650,7 @@ func TestRepoRename(t *testing.T) { } type mockSmartHTTP struct { + txMgr *transactions.Manager m sync.Mutex methodsCalled map[string]int } @@ -716,6 +719,18 @@ func (m *mockSmartHTTP) PostReceivePack(stream gitalypb.SmartHTTPService_PostRec } } + ctx := stream.Context() + + tx, err := metadata.TransactionFromContext(ctx) + if err != nil { + return helper.ErrInternal(err) + } + + hash := sha1.Sum([]byte{}) + if err := m.txMgr.VoteTransaction(ctx, tx.ID, tx.Node, hash[:]); err != nil { + return helper.ErrInternal(err) + } + return nil } @@ -737,7 +752,9 @@ func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, * } func TestProxyWrites(t *testing.T) { - smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{}, &mockSmartHTTP{}, &mockSmartHTTP{} + txMgr := transactions.NewManager() + + smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr} socket0, srv0 := newGrpcServer(t, smartHTTP0) defer srv0.Stop() @@ -774,7 +791,6 @@ func TestProxyWrites(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) - txMgr := transactions.NewManager() coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 3d3b15b0d..ed9f8ed77 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -19,6 +19,13 @@ import ( "google.golang.org/grpc/status" ) +type voter struct { + votes uint + vote string + showsUp bool + shouldSucceed bool +} + func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) txMgr := transactions.NewManager(opts...) @@ -310,13 +317,6 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) { } func TestTransactionReachesQuorum(t *testing.T) { - type voter struct { - votes uint - vote string - showsUp bool - shouldSucceed bool - } - tc := []struct { desc string voters []voter @@ -469,35 +469,109 @@ func TestTransactionFailures(t *testing.T) { } func TestTransactionCancellation(t *testing.T) { - counter, opts := setupMetrics() - cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) - defer cleanup() + testcases := []struct { + desc string + voters []voter + threshold uint + expectedMetrics counterMetrics + }{ + { + desc: "single node cancellation", + voters: []voter{ + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 1, + expectedMetrics: counterMetrics{registered: 1, committed: 0}, + }, + { + desc: "two nodes failing to show up", + voters: []voter{ + {votes: 1, showsUp: false, shouldSucceed: false}, + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 2, + expectedMetrics: counterMetrics{registered: 1, committed: 0}, + }, + { + desc: "two nodes with unweighted node failing", + voters: []voter{ + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 0, showsUp: false, shouldSucceed: false}, + }, + threshold: 1, + expectedMetrics: counterMetrics{registered: 1, started: 1, committed: 1}, + }, + { + desc: "multiple weighted votes with subset failing", + voters: []voter{ + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 1, showsUp: true, shouldSucceed: true}, + {votes: 1, showsUp: false, shouldSucceed: false}, + }, + threshold: 2, + expectedMetrics: counterMetrics{registered: 1, started: 2, committed: 2}, + }, + } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + counter, opts := setupMetrics() + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) + defer cleanup() - client := gitalypb.NewRefTransactionClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{ - {Name: "node1", Votes: 1}, - }, 1) - require.NoError(t, err) - require.NotZero(t, transactionID) + client := gitalypb.NewRefTransactionClient(cc) - cancelTransaction() + voters := make([]transactions.Voter, 0, len(tc.voters)) + for i, voter := range tc.voters { + voters = append(voters, transactions.Voter{ + Name: fmt.Sprintf("node-%d", i), + Votes: voter.votes, + }) + } - hash := sha1.Sum([]byte{}) - _, err = client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: transactionID, - Node: "node1", - ReferenceUpdatesHash: hash[:], - }) - require.Error(t, err) - require.Equal(t, codes.NotFound, status.Code(err)) + transactionID, cancelTransaction, err := txMgr.RegisterTransaction(ctx, voters, tc.threshold) + require.NoError(t, err) - verifyCounterMetrics(t, counter, counterMetrics{ - registered: 1, - started: 1, - invalid: 1, - }) + var wg sync.WaitGroup + for i, v := range tc.voters { + if !v.showsUp { + continue + } + + wg.Add(1) + go func(i int, v voter) { + defer wg.Done() + + name := fmt.Sprintf("node-%d", i) + hash := sha1.Sum([]byte(v.vote)) + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: transactionID, + Node: name, + ReferenceUpdatesHash: hash[:], + }) + require.NoError(t, err) + + if v.shouldSucceed { + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, response.State, "node should have received COMMIT") + } else { + require.Equal(t, gitalypb.VoteTransactionResponse_ABORT, response.State, "node should have received ABORT") + } + }(i, v) + } + wg.Wait() + + results, err := cancelTransaction() + require.NoError(t, err) + + for i, v := range tc.voters { + require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state") + } + + verifyCounterMetrics(t, counter, tc.expectedMetrics) + }) + } } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 9f7b5f8ae..d0319d5c6 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -102,8 +102,9 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // CancelFunc is the transaction cancellation function returned by // `RegisterTransaction`. Calling it will cause the transaction to be removed -// from the transaction manager. -type CancelFunc func() error +// from the transaction manager. It returns the outcome for each node: `true` +// if the node committed the transaction, `false` if it aborted. +type CancelFunc func() (map[string]bool, error) // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an @@ -136,17 +137,16 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr mgr.counterMetric.WithLabelValues("registered").Inc() - return transactionID, func() error { - mgr.cancelTransaction(transactionID, transaction) - return nil + return transactionID, func() (map[string]bool, error) { + return mgr.cancelTransaction(transactionID, transaction) }, nil } -func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) { +func (mgr *Manager) cancelTransaction(transactionID uint64, transaction *transaction) (map[string]bool, error) { mgr.lock.Lock() defer mgr.lock.Unlock() delete(mgr.transactions, transactionID) - transaction.cancel() + return transaction.cancel(), nil } func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 13282d950..ceb6b9a29 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -16,6 +16,19 @@ var ( ErrTransactionCanceled = errors.New("transaction was canceled") ) +// voteResult represents the outcome of a transaction for a single voter. +type voteResult int + +const ( + // voteUndecided means that the voter either didn't yet show up or that + // the vote couldn't yet be decided due to there being no majority yet. + voteUndecided voteResult = iota + // voteCommitted means that the voter committed his vote. + voteCommitted + // voteAborted means that the voter aborted his vote. + voteAborted +) + // Voter is a participant in a given transaction that may cast a vote. type Voter struct { // Name of the voter, usually Gitaly's storage name. @@ -25,7 +38,8 @@ type Voter struct { // this voter. Votes uint - vote vote + vote vote + result voteResult } type vote [sha1.Size]byte @@ -96,8 +110,24 @@ func newTransaction(voters []Voter, threshold uint) (*transaction, error) { }, nil } -func (t *transaction) cancel() { +func (t *transaction) cancel() map[string]bool { + t.lock.Lock() + defer t.lock.Unlock() + + results := make(map[string]bool, len(t.votersByNode)) + for node, voter := range t.votersByNode { + // If a voter didn't yet show up or is still undecided, we need + // to mark it as failed so it won't get the idea of committing + // the transaction at a later point anymore. + if voter.result == voteUndecided { + voter.result = voteAborted + } + results[node] = voter.result == voteCommitted + } + close(t.cancelCh) + + return results } func (t *transaction) vote(node string, hash []byte) error { @@ -176,11 +206,17 @@ func (t *transaction) collectVotes(ctx context.Context, node string) error { return fmt.Errorf("invalid node for transaction: %q", node) } + if voter.result != voteUndecided { + return fmt.Errorf("voter has already settled on an outcome: %q", node) + } + // See if our vote crossed the threshold. As there can be only one vote // exceeding it, we know we're the winner in that case. if t.voteCounts[voter.vote] < t.threshold { + voter.result = voteAborted return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold) } + voter.result = voteCommitted return nil } |