diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-12 11:55:56 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-12 11:55:56 +0300 |
commit | c8a29dc9fd507cab8835b2e1152b94a6ac96de35 (patch) | |
tree | b8f171c7273680a45fde27589c3370e5d04011c0 | |
parent | fb267fb98bcc515c5bfa9d45ba914c7272a93773 (diff) | |
parent | 528e8e926ef90d35ca85601f1eb28a947d63bea3 (diff) |
Merge branch 'pks-tx-coordinator-replication-error-handling' into 'master'
coordinator: Only schedule replication for differing error states
See merge request gitlab-org/gitaly!3642
-rw-r--r-- | internal/praefect/coordinator.go | 85 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 187 | ||||
-rw-r--r-- | internal/praefect/transactions/subtransaction.go | 18 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 38 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction_test.go | 27 |
6 files changed, 227 insertions, 136 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ee642eb4c..6f2863b1e 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io/ioutil" "sync" "time" @@ -811,46 +810,51 @@ func getUpdatedAndOutdatedSecondaries( primaryErr := nodeErrors.errByNode[route.Primary.Storage] - // If there were subtransactions, we only assume some changes were made if one of the subtransactions - // was committed. - // - // If there were no subtransactions, we assume changes were performed only if the primary successfully - // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere. - // - // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and - // the nodes simply failed before voting. - primaryDirtied = transaction.DidCommitAnySubtransaction() || - (transaction.CountSubtransactions() == 0 && primaryErr == nil) - - // If the primary wasn't dirtied, then we never replicate any changes. While this is - // duplicates logic defined elsewhere, it's probably good enough given that we only talk - // about metrics here. - recordReplication := func(reason string, replicationCount int) { - if primaryDirtied && replicationCount > 0 { - replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount)) - } + // If there were no subtransactions and the primary failed the RPC, we assume no changes + // have been made and the nodes simply failed before voting. We can thus return directly and + // notify the caller that the primary is not considered to be dirty. + if transaction.CountSubtransactions() == 0 && primaryErr != nil { + return false, nil, nil } - // Same as above, we discard log entries in case the primary wasn't dirtied. - logReplication := ctxlogrus.Extract(ctx) - if !primaryDirtied { - discardLogger := logrus.New() - discardLogger.Out = ioutil.Discard - logReplication = logrus.NewEntry(discardLogger) + // If there was a single subtransactions but the primary didn't cast a vote, then it means + // that the primary node has dropped out before secondaries were able to commit any changes + // to disk. Given that they cannot ever succeed without the primary, no change to disk + // should have happened. + if transaction.CountSubtransactions() == 1 && !transaction.DidVote(route.Primary.Storage) { + return false, nil, nil + } + + primaryDirtied = true + + nodesByState := make(map[string][]string) + defer func() { + ctxlogrus.Extract(ctx). + WithField("transaction.primary", route.Primary.Storage). + WithField("transaction.secondaries", nodesByState). + Info("transactional node states") + + for reason, nodes := range nodesByState { + replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes))) + } + }() + + markOutdated := func(reason string, nodes []string) { + if len(nodes) != 0 { + outdated = append(outdated, nodes...) + nodesByState[reason] = append(nodesByState[reason], nodes...) + } } // Replication targets were not added to the transaction, most likely because they are // either not healthy or out of date. We thus need to make sure to create replication jobs // for them. - outdated = append(outdated, route.ReplicationTargets...) - recordReplication("outdated", len(route.ReplicationTargets)) + markOutdated("outdated", route.ReplicationTargets) // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. if primaryErr != nil { - logReplication.WithError(primaryErr).Info("primary failed transaction") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("primary-failed", len(route.Secondaries)) + markOutdated("primary-failed", routerNodesToStorages(route.Secondaries)) return } @@ -859,9 +863,7 @@ func getUpdatedAndOutdatedSecondaries( // no changes were done and the nodes hit an error prior to voting. If the primary processed // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere. if transaction.CountSubtransactions() == 0 { - logReplication.Info("transaction did not create subtransactions") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("no-votes", len(route.Secondaries)) + markOutdated("no-votes", routerNodesToStorages(route.Secondaries)) return } @@ -869,9 +871,7 @@ func getUpdatedAndOutdatedSecondaries( // safe route and just replicate to all secondaries. nodeStates, err := transaction.State() if err != nil { - logReplication.WithError(err).Error("could not get transaction state") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("missing-tx-state", len(route.Secondaries)) + markOutdated("missing-tx-state", routerNodesToStorages(route.Secondaries)) return } @@ -879,11 +879,7 @@ func getUpdatedAndOutdatedSecondaries( // then we must assume that it dirtied on-disk state. This modified state may not be what we want, // but it's what we got. So in order to ensure a consistent state, we need to replicate. if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted { - if state == transactions.VoteFailed { - logReplication.Error("transaction: primary failed vote") - } - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("primary-not-committed", len(route.Secondaries)) + markOutdated("primary-not-committed", routerNodesToStorages(route.Secondaries)) return } @@ -891,18 +887,17 @@ func getUpdatedAndOutdatedSecondaries( // error and committed, it's considered up to date and thus does not need replication. for _, secondary := range route.Secondaries { if nodeErrors.errByNode[secondary.Storage] != nil { - outdated = append(outdated, secondary.Storage) - recordReplication("node-failed", 1) + markOutdated("node-failed", []string{secondary.Storage}) continue } if nodeStates[secondary.Storage] != transactions.VoteCommitted { - outdated = append(outdated, secondary.Storage) - recordReplication("node-not-committed", 1) + markOutdated("node-not-committed", []string{secondary.Storage}) continue } updated = append(updated, secondary.Storage) + nodesByState["updated"] = append(nodesByState["updated"], secondary.Storage) } return diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 316b05997..d43f2f02c 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -69,11 +69,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, }, { - desc: "failing vote should not create replication jobs without committed subtransactions", + desc: "failing vote should create replication jobs without committed subtransactions", nodes: []node{ - {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, - {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, - {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0}, + {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1}, + {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, + {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0}, }, }, { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 567029378..d7dcdd369 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1522,9 +1522,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { } type mockTransaction struct { - nodeStates map[string]transactions.VoteResult - subtransactions int - didCommitAnySubtransaction bool + nodeStates map[string]transactions.VoteResult + subtransactions int + didVote map[string]bool } func (t mockTransaction) ID() uint64 { @@ -1535,8 +1535,8 @@ func (t mockTransaction) CountSubtransactions() int { return t.subtransactions } -func (t mockTransaction) DidCommitAnySubtransaction() bool { - return t.didCommitAnySubtransaction +func (t mockTransaction) DidVote(node string) bool { + return t.didVote[node] } func (t mockTransaction) State() (map[string]transactions.VoteResult, error) { @@ -1556,16 +1556,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { anyErr := errors.New("arbitrary error") for _, tc := range []struct { - desc string - primary node - secondaries []node - replicas []string - subtransactions int - didCommitAnySubtransaction bool - expectedPrimaryDirtied bool - expectedOutdated []string - expectedUpdated []string - expectedMetrics map[string]int + desc string + primary node + secondaries []node + replicas []string + subtransactions int + didVote map[string]bool + expectedPrimaryDirtied bool + expectedOutdated []string + expectedUpdated []string + expectedMetrics map[string]int }{ { desc: "single committed node", @@ -1573,9 +1573,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, }, { desc: "single failed node", @@ -1606,24 +1608,24 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { name: "primary", state: transactions.VoteCommitted, }, - replicas: []string{"replica"}, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, expectedMetrics: map[string]int{ "outdated": 1, }, }, { - desc: "single failing node with replica", + desc: "single failing node with replica is not considered modified", primary: node{ name: "primary", state: transactions.VoteFailed, }, - replicas: []string{"replica"}, - subtransactions: 1, - expectedOutdated: []string{"replica"}, + subtransactions: 1, }, { desc: "single erred node with replica", @@ -1632,16 +1634,29 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { state: transactions.VoteCommitted, err: anyErr, }, - replicas: []string{"replica"}, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"replica"}, + replicas: []string{"replica"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"replica"}, expectedMetrics: map[string]int{ "outdated": 1, }, }, { + desc: "single erred node without commit with replica", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + err: anyErr, + }, + replicas: []string{"replica"}, + subtransactions: 1, + expectedPrimaryDirtied: false, + }, + { desc: "single node without transaction with replica", primary: node{ name: "primary", @@ -1664,10 +1679,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedUpdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "updated": 2, + }, }, { desc: "multiple committed nodes with primary err", @@ -1680,10 +1700,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted}, {name: "s2", state: transactions.VoteCommitted}, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, expectedMetrics: map[string]int{ "primary-failed": 2, }, @@ -1698,13 +1720,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteCommitted, err: anyErr}, {name: "s2", state: transactions.VoteCommitted}, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, expectedMetrics: map[string]int{ "node-failed": 1, + "updated": 1, }, }, { @@ -1717,13 +1742,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedUpdated: []string{"s2"}, - expectedOutdated: []string{"s1"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, expectedMetrics: map[string]int{ "node-not-committed": 1, + "updated": 1, }, }, { @@ -1736,15 +1764,33 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"s1", "s2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2"}, expectedMetrics: map[string]int{ "primary-not-committed": 2, }, }, { + desc: "failure with no primary votes", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + didVote: map[string]bool{ + "s1": true, + "s2": true, + }, + subtransactions: 1, + }, + { desc: "multiple nodes without subtransactions", primary: node{ name: "primary", @@ -1771,15 +1817,18 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted}, }, - replicas: []string{"r1", "r2"}, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"s1", "r1", "r2"}, - expectedUpdated: []string{"s2"}, + replicas: []string{"r1", "r2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "r1", "r2"}, + expectedUpdated: []string{"s2"}, expectedMetrics: map[string]int{ "node-not-committed": 1, "outdated": 2, + "updated": 1, }, }, { @@ -1792,11 +1841,13 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { {name: "s1", state: transactions.VoteFailed}, {name: "s2", state: transactions.VoteCommitted, err: anyErr}, }, - replicas: []string{"r1", "r2"}, - didCommitAnySubtransaction: true, - subtransactions: 1, - expectedPrimaryDirtied: true, - expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + replicas: []string{"r1", "r2"}, + didVote: map[string]bool{ + "primary": true, + }, + subtransactions: 1, + expectedPrimaryDirtied: true, + expectedOutdated: []string{"s1", "s2", "r1", "r2"}, expectedMetrics: map[string]int{ "node-failed": 1, "node-not-committed": 1, @@ -1823,9 +1874,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } transaction := mockTransaction{ - nodeStates: states, - subtransactions: tc.subtransactions, - didCommitAnySubtransaction: tc.didCommitAnySubtransaction, + nodeStates: states, + subtransactions: tc.subtransactions, + didVote: tc.didVote, } route := RepositoryMutatorRoute{ diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go index 7636a43eb..87b0ae1b0 100644 --- a/internal/praefect/transactions/subtransaction.go +++ b/internal/praefect/transactions/subtransaction.go @@ -345,3 +345,21 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) { return voter.result, nil } + +func (t *subtransaction) getVote(node string) (*voting.Vote, error) { + t.lock.RLock() + defer t.lock.RUnlock() + + voter, ok := t.votersByNode[node] + if !ok { + return nil, fmt.Errorf("invalid node for transaction: %q", node) + } + + if voter.vote == nil { + return nil, nil + } + + // Return a copy of the vote. + vote := *voter.vote + return &vote, nil +} diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 2476c0b31..9b6a15886 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -57,8 +57,8 @@ type Transaction interface { CountSubtransactions() int // State returns the state of each voter part of the transaction. State() (map[string]VoteResult, error) - // DidCommitAnySubtransaction returns whether the transaction committed at least one subtransaction. - DidCommitAnySubtransaction() bool + // DidVote returns whether the given node has cast a vote. + DidVote(string) bool } // transaction is a session where a set of voters votes on one or more @@ -166,12 +166,12 @@ func (t *transaction) State() (map[string]VoteResult, error) { return results, nil } - // Collect all subtransactions. As they are ordered by reverse recency, we can simply - // overwrite our own results. - for _, subtransaction := range t.subtransactions { - for voter, result := range subtransaction.state() { - results[voter] = result - } + // Collect voter results. Given that all subtransactions are created with all voters + // registered in the transaction, we can simply take results from the last subtransaction. + // Any nodes which didn't yet cast a vote in the last transaction will be in the default + // undecided state. + for voter, result := range t.subtransactions[len(t.subtransactions)-1].state() { + results[voter] = result } return results, nil @@ -186,26 +186,26 @@ func (t *transaction) CountSubtransactions() int { return len(t.subtransactions) } -// DidCommitSubtransaction returns whether the transaction committed at least one subtransaction. -func (t *transaction) DidCommitAnySubtransaction() bool { +// DidVote determines whether the given node did cast a vote. If it's not possible to retrieve the +// vote, then the node by definition didn't cast a vote. +func (t *transaction) DidVote(node string) bool { t.lock.Lock() defer t.lock.Unlock() + // If there are no subtransactions, then no vote could've been cast by the given node. if len(t.subtransactions) == 0 { return false } - // We only need to check the first subtransaction. If it failed, there would - // be no further subtransactions. - for _, result := range t.subtransactions[0].state() { - // It's sufficient to find a single commit in the subtransaction - // to say it was committed. - if result == VoteCommitted { - return true - } + // It's sufficient to take a look at the first transaction. + vote, err := t.subtransactions[0].getVote(node) + if err != nil { + // If it's not possible to retrieve the vote, then we consider the note to not have + // cast a vote. + return false } - return false + return vote != nil } // getOrCreateSubtransaction gets an ongoing subtransaction on which the given diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go index 38828396a..93b636bd3 100644 --- a/internal/praefect/transactions/transaction_test.go +++ b/internal/praefect/transactions/transaction_test.go @@ -24,3 +24,30 @@ func TestTransactionCancellationWithEmptyTransaction(t *testing.T) { require.Error(t, err) require.Equal(t, err, ErrTransactionCanceled) } + +func TestTransaction_DidVote(t *testing.T) { + ctx, cleanup := testhelper.Context() + defer cleanup() + + tx, err := newTransaction(1, []Voter{ + {Name: "v1", Votes: 1}, + {Name: "v2", Votes: 0}, + }, 1) + require.NoError(t, err) + + // An unregistered voter did not vote. + require.False(t, tx.DidVote("unregistered")) + // And neither of the registered ones did cast a vote yet. + require.False(t, tx.DidVote("v1")) + require.False(t, tx.DidVote("v2")) + + // One of both nodes does cast a vote. + require.NoError(t, tx.vote(ctx, "v1", voting.VoteFromData([]byte{}))) + require.True(t, tx.DidVote("v1")) + require.False(t, tx.DidVote("v2")) + + // And now the second node does cast a vote, too. + require.NoError(t, tx.vote(ctx, "v2", voting.VoteFromData([]byte{}))) + require.True(t, tx.DidVote("v1")) + require.True(t, tx.DidVote("v2")) +} |