Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-16 10:45:41 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-16 10:45:41 +0300
commite21d4f1e9ab1966581d46ec7de2f3089c55e7088 (patch)
tree92f1afac42a0e13a79d9f33db336ab5000c8ef9f
parent8b5e1161f66a5f8c112f47e32088273fca436347 (diff)
parent6fc78bf949f7f8c563c29023ae9516b727e7e6dd (diff)
Merge branch 'pks-hook-stop-transaction' into 'master'
hook: Stop transactions on pre-receive hook failure Closes #3094 and #3104 See merge request gitlab-org/gitaly!2643
-rw-r--r--changelogs/unreleased/pks-hook-stop-transaction.yml5
-rw-r--r--internal/gitaly/hook/prereceive.go3
-rw-r--r--internal/gitaly/hook/transactions.go59
-rw-r--r--internal/praefect/coordinator.go25
-rw-r--r--internal/praefect/coordinator_test.go109
-rw-r--r--internal/praefect/transaction_test.go30
-rw-r--r--internal/praefect/transactions/manager.go10
-rw-r--r--internal/praefect/transactions/subtransaction.go54
-rw-r--r--internal/praefect/transactions/transaction.go41
9 files changed, 257 insertions, 79 deletions
diff --git a/changelogs/unreleased/pks-hook-stop-transaction.yml b/changelogs/unreleased/pks-hook-stop-transaction.yml
new file mode 100644
index 000000000..656a5d2ca
--- /dev/null
+++ b/changelogs/unreleased/pks-hook-stop-transaction.yml
@@ -0,0 +1,5 @@
+---
+title: 'hook: Stop transactions on pre-receive hook failure'
+merge_request: 2643
+author:
+type: fixed
diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go
index 8f8fa79cf..2754b572d 100644
--- a/internal/gitaly/hook/prereceive.go
+++ b/internal/gitaly/hook/prereceive.go
@@ -53,6 +53,9 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R
// Only the primary should execute hooks and increment reference counters.
if primary {
if err := m.preReceiveHook(ctx, repo, env, changes, stdout, stderr); err != nil {
+ // If the pre-receive hook declines the push, then we need to stop any
+ // secondaries voting on the transaction.
+ m.stopTransaction(ctx, env)
return err
}
}
diff --git a/internal/gitaly/hook/transactions.go b/internal/gitaly/hook/transactions.go
index 0b1a78f1d..6d52ebf9d 100644
--- a/internal/gitaly/hook/transactions.go
+++ b/internal/gitaly/hook/transactions.go
@@ -34,7 +34,13 @@ func (m *GitLabHookManager) getPraefectConn(ctx context.Context, server *metadat
return m.conns.Dial(ctx, address, server.Token)
}
-func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, env []string) error {
+// transactionHandler is a callback invoked on a transaction if it exists.
+type transactionHandler func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error
+
+// runWithTransaction runs the given function if the environment identifies a transaction. No error
+// is returned if no transaction exists. If a transaction exists and the function is executed on it,
+// then its error will ber returned directly.
+func (m *GitLabHookManager) runWithTransaction(ctx context.Context, env []string, handler transactionHandler) error {
tx, err := metadata.TransactionFromEnv(env)
if err != nil {
if errors.Is(err, metadata.ErrTransactionNotFound) {
@@ -61,19 +67,48 @@ func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte,
praefectClient := gitalypb.NewRefTransactionClient(praefectConn)
- defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration()
- response, err := praefectClient.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
- TransactionId: tx.ID,
- Node: tx.Node,
- ReferenceUpdatesHash: hash,
- })
- if err != nil {
+ if err := handler(ctx, tx, praefectClient); err != nil {
return err
}
- if response.State != gitalypb.VoteTransactionResponse_COMMIT {
- return errors.New("transaction was aborted")
- }
-
return nil
}
+
+func (m *GitLabHookManager) voteOnTransaction(ctx context.Context, hash []byte, env []string) error {
+ return m.runWithTransaction(ctx, env, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error {
+ defer prometheus.NewTimer(m.votingDelayMetric).ObserveDuration()
+
+ response, err := client.VoteTransaction(ctx, &gitalypb.VoteTransactionRequest{
+ TransactionId: tx.ID,
+ Node: tx.Node,
+ ReferenceUpdatesHash: hash,
+ })
+ if err != nil {
+ return err
+ }
+
+ switch response.State {
+ case gitalypb.VoteTransactionResponse_COMMIT:
+ return nil
+ case gitalypb.VoteTransactionResponse_ABORT:
+ return errors.New("transaction was aborted")
+ case gitalypb.VoteTransactionResponse_STOP:
+ return errors.New("transaction was stopped")
+ default:
+ return errors.New("invalid transaction state")
+ }
+ })
+}
+
+func (m *GitLabHookManager) stopTransaction(ctx context.Context, env []string) error {
+ return m.runWithTransaction(ctx, env, func(ctx context.Context, tx metadata.Transaction, client gitalypb.RefTransactionClient) error {
+ _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{
+ TransactionId: tx.ID,
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+ })
+}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 3d752512b..a1c6f6b46 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -555,14 +555,17 @@ func (c *Coordinator) createTransactionFinalizer(
params datastore.Params,
) func() error {
return func() error {
- successByNode := transaction.State()
+ nodeStates, err := transaction.State()
+ if err != nil {
+ return err
+ }
// If no subtransaction happened, then the called RPC may not be aware of
// transactions at all. We thus need to assume it changed repository state
// and need to create replication jobs.
if transaction.CountSubtransactions() == 0 {
- secondaries := make([]string, 0, len(successByNode))
- for secondary := range successByNode {
+ secondaries := make([]string, 0, len(nodeStates))
+ for secondary := range nodeStates {
if secondary == route.Primary.Storage {
continue
}
@@ -577,16 +580,22 @@ func (c *Coordinator) createTransactionFinalizer(
// If the primary node failed the transaction, then
// there's no sense in trying to replicate from primary
// to secondaries.
- if !successByNode[route.Primary.Storage] {
+ if nodeStates[route.Primary.Storage] != transactions.VoteCommitted {
+ // If the transaction was gracefully stopped, then we don't want to return
+ // an explicit error here as it indicates an error somewhere else which
+ // already got returned.
+ if nodeStates[route.Primary.Storage] == transactions.VoteStopped {
+ return nil
+ }
return fmt.Errorf("transaction: primary failed vote")
}
- delete(successByNode, route.Primary.Storage)
+ delete(nodeStates, route.Primary.Storage)
- updatedSecondaries := make([]string, 0, len(successByNode))
+ updatedSecondaries := make([]string, 0, len(nodeStates))
var outdatedSecondaries []string
- for node, success := range successByNode {
- if success {
+ for node, state := range nodeStates {
+ if state == transactions.VoteCommitted {
updatedSecondaries = append(updatedSecondaries, node)
continue
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 01c5603f0..4b2b6b6a8 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -479,6 +479,115 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
}
+func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ server, _ := testhelper.NewServerWithHealth(t, socket)
+ defer server.Stop()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*config.Node{
+ &config.Node{Address: "unix://" + socket, Storage: "primary"},
+ &config.Node{Address: "unix://" + socket, Storage: "secondary"},
+ },
+ },
+ },
+ }
+
+ repo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, name := range []string{"primary", "secondary"} {
+ node, err := shard.GetNode(name)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, node, true)
+ }
+
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ for _, node := range []string{"primary", "secondary"} {
+ require.NoError(t, rs.SetGeneration(ctx, "praefect", repo.RelativePath, node, 1))
+ }
+
+ txMgr := transactions.NewManager(conf)
+
+ coordinator := NewCoordinator(
+ datastore.NewMemoryReplicationEventQueue(conf),
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
+
+ frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{
+ Repository: &repo,
+ })
+ require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+
+ transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ var syncWG sync.WaitGroup
+
+ wg.Add(2)
+ syncWG.Add(2)
+
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote[:])
+ require.NoError(t, err)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID))
+ }()
+
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:])
+ require.NoError(t, err)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:])
+ assert.True(t, errors.Is(err, transactions.ErrTransactionStopped))
+ }()
+
+ wg.Wait()
+
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
+}
+
func TestStreamDirectorAccessor(t *testing.T) {
gitalySocket := testhelper.GetTemporaryGitalySocketFileName()
srv, _ := testhelper.NewServerWithHealth(t, gitalySocket)
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index a1a1920b5..91b23ed37 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -543,9 +543,14 @@ func TestTransactionWithMultipleVotes(t *testing.T) {
wg.Wait()
require.NoError(t, cancel())
- results := transaction.State()
+ results, err := transaction.State()
+ require.NoError(t, err)
for i, voter := range tc.voters {
- require.Equal(t, voter.shouldSucceed, results[fmt.Sprintf("node-%d", i)])
+ if voter.shouldSucceed {
+ require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)])
+ } else {
+ require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)])
+ }
}
})
}
@@ -672,9 +677,14 @@ func TestTransactionCancellation(t *testing.T) {
require.NoError(t, cancelTransaction())
- results := transaction.State()
+ results, err := transaction.State()
+ require.NoError(t, err)
for i, v := range tc.voters {
- require.Equal(t, results[fmt.Sprintf("node-%d", i)], v.shouldSucceed, "result mismatches expected node state")
+ if v.shouldSucceed {
+ require.Equal(t, transactions.VoteCommitted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
+ } else {
+ require.Equal(t, transactions.VoteAborted, results[fmt.Sprintf("node-%d", i)], "result mismatches expected node state")
+ }
}
verifyCounterMetrics(t, txMgr, tc.expectedMetrics)
@@ -761,8 +771,9 @@ func TestStopTransaction(t *testing.T) {
require.NoError(t, err)
require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
- results := transaction.State()
- require.Equal(t, results["voter"], false)
+ results, err := transaction.State()
+ require.NoError(t, err)
+ require.Equal(t, transactions.VoteStopped, results["voter"])
verifyCounterMetrics(t, txMgr, counterMetrics{
registered: 1,
started: 1,
@@ -809,9 +820,10 @@ func TestStopTransaction(t *testing.T) {
require.NoError(t, err)
require.Equal(t, gitalypb.VoteTransactionResponse_STOP, response.State)
- results := transaction.State()
- require.True(t, results["successful-voter"], "Successful voter should succeed")
- require.False(t, results["failing-voter"], "Failing voter should fail")
+ results, err := transaction.State()
+ require.NoError(t, err)
+ require.Equal(t, transactions.VoteCommitted, results["successful-voter"], "Successful voter should succeed")
+ require.Equal(t, transactions.VoteStopped, results["failing-voter"], "Failing voter should fail")
verifyCounterMetrics(t, txMgr, counterMetrics{
committed: 1,
registered: 2,
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 0cf536af6..7c3572887 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -173,9 +173,13 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *Transact
mgr.subtransactionsMetric.Observe(float64(transaction.CountSubtransactions()))
var committed uint64
- state := transaction.State()
- for _, success := range state {
- if success {
+ state, err := transaction.State()
+ if err != nil {
+ return err
+ }
+
+ for _, result := range state {
+ if result == VoteCommitted {
committed++
}
}
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 33c11f5da..679eebb06 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -16,19 +16,19 @@ var (
ErrTransactionCanceled = errors.New("transaction was canceled")
)
-// voteResult represents the outcome of a transaction for a single voter.
-type voteResult int
+// VoteResult represents the outcome of a transaction for a single voter.
+type VoteResult int
const (
- // voteUndecided means that the voter either didn't yet show up or that
+ // VoteUndecided means that the voter either didn't yet show up or that
// the vote couldn't yet be decided due to there being no majority yet.
- voteUndecided voteResult = iota
- // voteCommitted means that the voter committed his vote.
- voteCommitted
+ VoteUndecided VoteResult = iota
+ // VoteCommitted means that the voter committed his vote.
+ VoteCommitted
// voteAborted means that the voter aborted his vote.
- voteAborted
- // voteStopped means that the transaction was gracefully stopped.
- voteStopped
+ VoteAborted
+ // VoteStopped means that the transaction was gracefully stopped.
+ VoteStopped
)
type vote [sha1.Size]byte
@@ -86,8 +86,8 @@ func (t *subtransaction) cancel() {
// If a voter didn't yet show up or is still undecided, we need
// to mark it as failed so it won't get the idea of committing
// the transaction at a later point anymore.
- if voter.result == voteUndecided {
- voter.result = voteAborted
+ if voter.result == VoteUndecided {
+ voter.result = VoteAborted
}
}
@@ -100,16 +100,16 @@ func (t *subtransaction) stop() error {
for _, voter := range t.votersByNode {
switch voter.result {
- case voteAborted:
+ case VoteAborted:
// If the vote was aborted already, we cannot stop it.
return ErrTransactionCanceled
- case voteStopped:
+ case VoteStopped:
// Similar if the vote was stopped already.
return ErrTransactionStopped
- case voteUndecided:
+ case VoteUndecided:
// Undecided voters will get stopped, ...
- voter.result = voteStopped
- case voteCommitted:
+ voter.result = VoteStopped
+ case VoteCommitted:
// ... while decided voters cannot be changed anymore.
continue
}
@@ -119,13 +119,13 @@ func (t *subtransaction) stop() error {
return nil
}
-func (t *subtransaction) state() map[string]bool {
+func (t *subtransaction) state() map[string]VoteResult {
t.lock.Lock()
defer t.lock.Unlock()
- results := make(map[string]bool, len(t.votersByNode))
+ results := make(map[string]VoteResult, len(t.votersByNode))
for node, voter := range t.votersByNode {
- results[node] = voter.result == voteCommitted
+ results[node] = voter.result
}
return results
@@ -210,14 +210,14 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
}
switch voter.result {
- case voteUndecided:
+ case VoteUndecided:
// Happy case, no decision was yet made.
- case voteAborted:
+ case VoteAborted:
// It may happen that the vote was cancelled or stopped just after majority was
- // reached. In that case, the node's state is now voteAborted/voteStopped, so we
+ // reached. In that case, the node's state is now VoteAborted/VoteStopped, so we
// have to return an error here.
return ErrTransactionCanceled
- case voteStopped:
+ case VoteStopped:
return ErrTransactionStopped
default:
return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
@@ -226,21 +226,21 @@ func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
// See if our vote crossed the threshold. As there can be only one vote
// exceeding it, we know we're the winner in that case.
if t.voteCounts[voter.vote] < t.threshold {
- voter.result = voteAborted
+ voter.result = VoteAborted
return fmt.Errorf("%w: got %d/%d votes", ErrTransactionVoteFailed, t.voteCounts[voter.vote], t.threshold)
}
- voter.result = voteCommitted
+ voter.result = VoteCommitted
return nil
}
-func (t *subtransaction) getResult(node string) (voteResult, error) {
+func (t *subtransaction) getResult(node string) (VoteResult, error) {
t.lock.RLock()
defer t.lock.RUnlock()
voter, ok := t.votersByNode[node]
if !ok {
- return voteAborted, fmt.Errorf("invalid node for transaction: %q", node)
+ return VoteAborted, fmt.Errorf("invalid node for transaction: %q", node)
}
return voter.result, nil
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 5c1d53a23..ae63947b4 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -41,7 +41,7 @@ type Voter struct {
Votes uint
vote vote
- result voteResult
+ result VoteResult
}
// Transaction is a session where a set of voters votes on one or more
@@ -127,36 +127,37 @@ func (t *Transaction) ID() uint64 {
// State returns the voting state mapped by voters. A voting state of `true`
// means all subtransactions were successful, a voting state of `false` means
// either no subtransactions were created or any of the subtransactions failed.
-func (t *Transaction) State() map[string]bool {
+func (t *Transaction) State() (map[string]VoteResult, error) {
t.lock.Lock()
defer t.lock.Unlock()
- results := make(map[string]bool, len(t.voters))
+ results := make(map[string]VoteResult, len(t.voters))
if len(t.subtransactions) == 0 {
- // If there's no subtransactions, we simply return `false` for
- // every voter.
for _, voter := range t.voters {
- results[voter.Name] = false
+ switch t.state {
+ case transactionOpen:
+ results[voter.Name] = VoteUndecided
+ case transactionCanceled:
+ results[voter.Name] = VoteAborted
+ case transactionStopped:
+ results[voter.Name] = VoteStopped
+ default:
+ return nil, errors.New("invalid transaction state")
+ }
}
- return results
+ return results, nil
}
- // We need to collect outcomes of all subtransactions. If any of the
- // subtransactions failed, then the overall transaction failed for that
- // node as well. Otherwise, if all subtransactions for the node
- // succeeded, the transaction did as well.
+ // Collect all subtransactions. As they are ordered by reverse recency, we can simply
+ // overwrite our own results.
for _, subtransaction := range t.subtransactions {
for voter, result := range subtransaction.state() {
- // If there already is an entry indicating failure, keep it.
- if didSucceed, ok := results[voter]; ok && !didSucceed {
- continue
- }
results[voter] = result
}
}
- return results
+ return results, nil
}
// CountSubtransactions counts the number of subtransactions created as part of
@@ -191,19 +192,19 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
}
switch result {
- case voteUndecided:
+ case VoteUndecided:
// An undecided vote means we should vote on this one.
return subtransaction, nil
- case voteCommitted:
+ case VoteCommitted:
// If we have committed this subtransaction, we're good
// to go.
continue
- case voteAborted:
+ case VoteAborted:
// If the subtransaction was aborted, then we need to
// fail as we cannot proceed if the path leading to the
// end result has intermittent failures.
return nil, ErrSubtransactionFailed
- case voteStopped:
+ case VoteStopped:
// If the transaction was stopped, then we need to fail
// with a graceful error.
return nil, ErrTransactionStopped