diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-16 10:45:41 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-16 10:45:41 +0300 |
commit | e21d4f1e9ab1966581d46ec7de2f3089c55e7088 (patch) | |
tree | 92f1afac42a0e13a79d9f33db336ab5000c8ef9f | |
parent | 8b5e1161f66a5f8c112f47e32088273fca436347 (diff) | |
parent | 6fc78bf949f7f8c563c29023ae9516b727e7e6dd (diff) |
Merge branch 'pks-hook-stop-transaction' into 'master'
hook: Stop transactions on pre-receive hook failure
Closes #3094 and #3104
See merge request gitlab-org/gitaly!2643
-rw-r--r-- | changelogs/unreleased/pks-hook-stop-transaction.yml | 5 | ||||
-rw-r--r-- | internal/gitaly/hook/prereceive.go | 3 | ||||
-rw-r--r-- | internal/gitaly/hook/transactions.go | 59 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 25 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 109 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 30 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 10 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 54 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 41 |
9 files changed, 257 insertions, 79 deletions
diff --git a/changelogs/unreleased/pks-hook-stop-transaction.yml b/changelogs/unreleased/pks-hook-stop-transaction.yml new file mode 100644 index 000000000..656a5d2ca --- /dev/null +++ b/changelogs/unreleased/pks-hook-stop-transaction.yml @@ -0,0 +1,5 @@ +--- +title: 'hook: Stop transactions on pre-receive hook failure' +merge_request: 2643 +author: +type: fixed diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index 8f8fa79cf..2754b572d 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -53,6 +53,9 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R // Only the primary should execute hooks and increment reference counters. if primary { if err := m.preReceiveHook(ctx, repo, env, changes, stdout, stderr); err != nil { + // If the pre-receive hook declines the push, then we need to stop any + // secondaries voting on the transaction. + m.stopTransaction(ctx, env) return err } } diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go index 0b1a78f1d..6d52ebf9d 100644 --- a/internal/gitaly/hook/transactions.go +++ b/internal/gitaly/hook/transactions.go @@ -34,7 +34,13 @@ func (m *GitLabHookManager) getPraefectConn(ctx context.Context, server *metadat return m.conns.Dial(ctx, address, server.Token) } -func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, env []string) error { +// transactionHandler is a callback invoked on a transaction if it exists. +type transactionHandler func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error + +// runWithTransaction runs the given function if the environment identifies a transaction. No error +// is returned if no transaction exists. If a transaction exists and the function is executed on it, +// then its error will ber returned directly. +func (m *GitLabHookManager) runWithTransaction(ctx context.Context, env []string, handler transactionHandler) error { tx, err := metadata.TransactionFromEnv(env) if err != nil { if errors.Is(err, metadata.ErrTransactionNotFound) { @@ -61,19 +67,48 @@ func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, praefectClient := gitalypb.NewRefTransactionClient(praefectConn) - defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration() - response, err := praefectClient.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ - TransactionId: tx.ID, - Node: tx.Node, - ReferenceUpdatesHash: hash, - }) - if err != nil { + if err := handler(ctx, tx, praefectClient); err != nil { return err } - if response.State != gitalypb.VoteTransactionResponse_COMMIT { - return errors.New("transaction was aborted") - } - return nil } + +func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, env []string) error { + return m.runWithTransaction(ctx, env, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error { + defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration() + + response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{ + TransactionId: tx.ID, + Node: tx.Node, + ReferenceUpdatesHash: hash, + }) + if err != nil { + return err + } + + switch response.State { + case gitalypb.VoteTransactionResponse_COMMIT: + return nil + case gitalypb.VoteTransactionResponse_ABORT: + return errors.New("transaction was aborted") + case gitalypb.VoteTransactionResponse_STOP: + return errors.New("transaction was stopped") + default: + return errors.New("invalid transaction state") + } + }) +} + +func (m *GitLabHookManager) stopTransaction(ctx context.Context, env []string) error { + return m.runWithTransaction(ctx, env, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error { + _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ + TransactionId: tx.ID, + }) + if err != nil { + return err + } + + return nil + }) +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 3d752512b..a1c6f6b46 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -555,14 +555,17 @@ func (c *Coordinator) createTransactionFinalizer( params datastore.Params, ) func() error { return func() error { - successByNode := transaction.State() + nodeStates, err := transaction.State() + if err != nil { + return err + } // If no subtransaction happened, then the called RPC may not be aware of // transactions at all. We thus need to assume it changed repository state // and need to create replication jobs. if transaction.CountSubtransactions() == 0 { - secondaries := make([]string, 0, len(successByNode)) - for secondary := range successByNode { + secondaries := make([]string, 0, len(nodeStates)) + for secondary := range nodeStates { if secondary == route.Primary.Storage { continue } @@ -577,16 +580,22 @@ func (c *Coordinator) createTransactionFinalizer( // If the primary node failed the transaction, then // there's no sense in trying to replicate from primary // to secondaries. - if !successByNode[route.Primary.Storage] { + if nodeStates[route.Primary.Storage] != transactions.VoteCommitted { + // If the transaction was gracefully stopped, then we don't want to return + // an explicit error here as it indicates an error somewhere else which + // already got returned. + if nodeStates[route.Primary.Storage] == transactions.VoteStopped { + return nil + } return fmt.Errorf("transaction: primary failed vote") } - delete(successByNode, route.Primary.Storage) + delete(nodeStates, route.Primary.Storage) - updatedSecondaries := make([]string, 0, len(successByNode)) + updatedSecondaries := make([]string, 0, len(nodeStates)) var outdatedSecondaries []string - for node, success := range successByNode { - if success { + for node, state := range nodeStates { + if state == transactions.VoteCommitted { updatedSecondaries = append(updatedSecondaries, node) continue } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 01c5603f0..4b2b6b6a8 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -479,6 +479,115 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } } +func TestStreamDirectorMutator_StopTransaction(t *testing.T) { + socket := testhelper.GetTemporaryGitalySocketFileName() + server, _ := testhelper.NewServerWithHealth(t, socket) + defer server.Stop() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*config.Node{ + &config.Node{Address: "unix://" + socket, Storage: "primary"}, + &config.Node{Address: "unix://" + socket, Storage: "secondary"}, + }, + }, + }, + } + + repo := gitalypb.Repository{ + StorageName: "praefect", + RelativePath: "/path/to/hashed/storage", + } + + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) + require.NoError(t, err) + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, name := range []string{"primary", "secondary"} { + node, err := shard.GetNode(name) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, node, true) + } + + rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + for _, node := range []string{"primary", "secondary"} { + require.NoError(t, rs.SetGeneration(ctx, "praefect", repo.RelativePath, node, 1)) + } + + txMgr := transactions.NewManager(conf) + + coordinator := NewCoordinator( + datastore.NewMemoryReplicationEventQueue(conf), + rs, + NewNodeManagerRouter(nodeMgr, rs), + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) + + fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" + + frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{ + Repository: &repo, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + var syncWG sync.WaitGroup + + wg.Add(2) + syncWG.Add(2) + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote[:]) + require.NoError(t, err) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID)) + }() + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:]) + require.NoError(t, err) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:]) + assert.True(t, errors.Is(err, transactions.ErrTransactionStopped)) + }() + + wg.Wait() + + err = streamParams.RequestFinalizer() + require.NoError(t, err) +} + func TestStreamDirectorAccessor(t *testing.T) { gitalySocket := testhelper.GetTemporaryGitalySocketFileName() srv, _ := testhelper.NewServerWithHealth(t, gitalySocket) diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index a1a1920b5..91b23ed37 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -543,9 +543,14 @@ func TestTransactionWithMultipleVotes(t *testing.T) { wg.Wait() require.NoError(t, cancel()) - results := transaction.State() + results, err := transaction.State() + require.NoError(t, err) for i, voter := range tc.voters { - require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)]) + if voter.shouldSucceed { + require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)]) + } else { + require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)]) + } } }) } @@ -672,9 +677,14 @@ func TestTransactionCancellation(t *testing.T) { require.NoError(t, cancelTransaction()) - results := transaction.State() + results, err := transaction.State() + require.NoError(t, err) for i, v := range tc.voters { - require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state") + if v.shouldSucceed { + require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") + } else { + require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state") + } } verifyCounterMetrics(t, txMgr, tc.expectedMetrics) @@ -761,8 +771,9 @@ func TestStopTransaction(t *testing.T) { require.NoError(t, err) require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) - results := transaction.State() - require.Equal(t, results["voter"], false) + results, err := transaction.State() + require.NoError(t, err) + require.Equal(t, transactions.VoteStopped, results["voter"]) verifyCounterMetrics(t, txMgr, counterMetrics{ registered: 1, started: 1, @@ -809,9 +820,10 @@ func TestStopTransaction(t *testing.T) { require.NoError(t, err) require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State) - results := transaction.State() - require.True(t, results["successful-voter"], "Successful voter should succeed") - require.False(t, results["failing-voter"], "Failing voter should fail") + results, err := transaction.State() + require.NoError(t, err) + require.Equal(t, transactions.VoteCommitted, results["successful-voter"], "Successful voter should succeed") + require.Equal(t, transactions.VoteStopped, results["failing-voter"], "Failing voter should fail") verifyCounterMetrics(t, txMgr, counterMetrics{ committed: 1, registered: 2, diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 0cf536af6..7c3572887 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -173,9 +173,13 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *Transact mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions())) var committed uint64 - state := transaction.State() - for _, success := range state { - if success { + state, err := transaction.State() + if err != nil { + return err + } + + for _, result := range state { + if result == VoteCommitted { committed++ } } diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 33c11f5da..679eebb06 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -16,19 +16,19 @@ var ( ErrTransactionCanceled = errors.New("transaction was canceled") ) -// voteResult represents the outcome of a transaction for a single voter. -type voteResult int +// VoteResult represents the outcome of a transaction for a single voter. +type VoteResult int const ( - // voteUndecided means that the voter either didn't yet show up or that + // VoteUndecided means that the voter either didn't yet show up or that // the vote couldn't yet be decided due to there being no majority yet. - voteUndecided voteResult = iota - // voteCommitted means that the voter committed his vote. - voteCommitted + VoteUndecided VoteResult = iota + // VoteCommitted means that the voter committed his vote. + VoteCommitted // voteAborted means that the voter aborted his vote. - voteAborted - // voteStopped means that the transaction was gracefully stopped. - voteStopped + VoteAborted + // VoteStopped means that the transaction was gracefully stopped. + VoteStopped ) type vote [sha1.Size]byte @@ -86,8 +86,8 @@ func (t *subtransaction) cancel() { // If a voter didn't yet show up or is still undecided, we need // to mark it as failed so it won't get the idea of committing // the transaction at a later point anymore. - if voter.result == voteUndecided { - voter.result = voteAborted + if voter.result == VoteUndecided { + voter.result = VoteAborted } } @@ -100,16 +100,16 @@ func (t *subtransaction) stop() error { for _, voter := range t.votersByNode { switch voter.result { - case voteAborted: + case VoteAborted: // If the vote was aborted already, we cannot stop it. return ErrTransactionCanceled - case voteStopped: + case VoteStopped: // Similar if the vote was stopped already. return ErrTransactionStopped - case voteUndecided: + case VoteUndecided: // Undecided voters will get stopped, ... - voter.result = voteStopped - case voteCommitted: + voter.result = VoteStopped + case VoteCommitted: // ... while decided voters cannot be changed anymore. continue } @@ -119,13 +119,13 @@ func (t *subtransaction) stop() error { return nil } -func (t *subtransaction) state() map[string]bool { +func (t *subtransaction) state() map[string]VoteResult { t.lock.Lock() defer t.lock.Unlock() - results := make(map[string]bool, len(t.votersByNode)) + results := make(map[string]VoteResult, len(t.votersByNode)) for node, voter := range t.votersByNode { - results[node] = voter.result == voteCommitted + results[node] = voter.result } return results @@ -210,14 +210,14 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { } switch voter.result { - case voteUndecided: + case VoteUndecided: // Happy case, no decision was yet made. - case voteAborted: + case VoteAborted: // It may happen that the vote was cancelled or stopped just after majority was - // reached. In that case, the node's state is now voteAborted/voteStopped, so we + // reached. In that case, the node's state is now VoteAborted/VoteStopped, so we // have to return an error here. return ErrTransactionCanceled - case voteStopped: + case VoteStopped: return ErrTransactionStopped default: return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node) @@ -226,21 +226,21 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error { // See if our vote crossed the threshold. As there can be only one vote // exceeding it, we know we're the winner in that case. if t.voteCounts[voter.vote] < t.threshold { - voter.result = voteAborted + voter.result = VoteAborted return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold) } - voter.result = voteCommitted + voter.result = VoteCommitted return nil } -func (t *subtransaction) getResult(node string) (voteResult, error) { +func (t *subtransaction) getResult(node string) (VoteResult, error) { t.lock.RLock() defer t.lock.RUnlock() voter, ok := t.votersByNode[node] if !ok { - return voteAborted, fmt.Errorf("invalid node for transaction: %q", node) + return VoteAborted, fmt.Errorf("invalid node for transaction: %q", node) } return voter.result, nil diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 5c1d53a23..ae63947b4 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -41,7 +41,7 @@ type Voter struct { Votes uint vote vote - result voteResult + result VoteResult } // Transaction is a session where a set of voters votes on one or more @@ -127,36 +127,37 @@ func (t *Transaction) ID() uint64 { // State returns the voting state mapped by voters. A voting state of `true` // means all subtransactions were successful, a voting state of `false` means // either no subtransactions were created or any of the subtransactions failed. -func (t *Transaction) State() map[string]bool { +func (t *Transaction) State() (map[string]VoteResult, error) { t.lock.Lock() defer t.lock.Unlock() - results := make(map[string]bool, len(t.voters)) + results := make(map[string]VoteResult, len(t.voters)) if len(t.subtransactions) == 0 { - // If there's no subtransactions, we simply return `false` for - // every voter. for _, voter := range t.voters { - results[voter.Name] = false + switch t.state { + case transactionOpen: + results[voter.Name] = VoteUndecided + case transactionCanceled: + results[voter.Name] = VoteAborted + case transactionStopped: + results[voter.Name] = VoteStopped + default: + return nil, errors.New("invalid transaction state") + } } - return results + return results, nil } - // We need to collect outcomes of all subtransactions. If any of the - // subtransactions failed, then the overall transaction failed for that - // node as well. Otherwise, if all subtransactions for the node - // succeeded, the transaction did as well. + // Collect all subtransactions. As they are ordered by reverse recency, we can simply + // overwrite our own results. for _, subtransaction := range t.subtransactions { for voter, result := range subtransaction.state() { - // If there already is an entry indicating failure, keep it. - if didSucceed, ok := results[voter]; ok && !didSucceed { - continue - } results[voter] = result } } - return results + return results, nil } // CountSubtransactions counts the number of subtransactions created as part of @@ -191,19 +192,19 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e } switch result { - case voteUndecided: + case VoteUndecided: // An undecided vote means we should vote on this one. return subtransaction, nil - case voteCommitted: + case VoteCommitted: // If we have committed this subtransaction, we're good // to go. continue - case voteAborted: + case VoteAborted: // If the subtransaction was aborted, then we need to // fail as we cannot proceed if the path leading to the // end result has intermittent failures. return nil, ErrSubtransactionFailed - case voteStopped: + case VoteStopped: // If the transaction was stopped, then we need to fail // with a graceful error. return nil, ErrTransactionStopped |