Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-07-12 11:55:56 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-07-12 11:55:56 +0300
commitc8a29dc9fd507cab8835b2e1152b94a6ac96de35 (patch)
treeb8f171c7273680a45fde27589c3370e5d04011c0
parentfb267fb98bcc515c5bfa9d45ba914c7272a93773 (diff)
parent528e8e926ef90d35ca85601f1eb28a947d63bea3 (diff)
Merge branch 'pks-tx-coordinator-replication-error-handling' into 'master'
coordinator: Only schedule replication for differing error states See merge request gitlab-org/gitaly!3642
-rw-r--r--internal/praefect/coordinator.go85
-rw-r--r--internal/praefect/coordinator_pg_test.go8
-rw-r--r--internal/praefect/coordinator_test.go187
-rw-r--r--internal/praefect/transactions/subtransaction.go18
-rw-r--r--internal/praefect/transactions/transaction.go38
-rw-r--r--internal/praefect/transactions/transaction_test.go27
6 files changed, 227 insertions, 136 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index ee642eb4c..6f2863b1e 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
- "io/ioutil"
"sync"
"time"
@@ -811,46 +810,51 @@ func getUpdatedAndOutdatedSecondaries(
primaryErr := nodeErrors.errByNode[route.Primary.Storage]
- // If there were subtransactions, we only assume some changes were made if one of the subtransactions
- // was committed.
- //
- // If there were no subtransactions, we assume changes were performed only if the primary successfully
- // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere.
- //
- // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and
- // the nodes simply failed before voting.
- primaryDirtied = transaction.DidCommitAnySubtransaction() ||
- (transaction.CountSubtransactions() == 0 && primaryErr == nil)
-
- // If the primary wasn't dirtied, then we never replicate any changes. While this is
- // duplicates logic defined elsewhere, it's probably good enough given that we only talk
- // about metrics here.
- recordReplication := func(reason string, replicationCount int) {
- if primaryDirtied && replicationCount > 0 {
- replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount))
- }
+ // If there were no subtransactions and the primary failed the RPC, we assume no changes
+ // have been made and the nodes simply failed before voting. We can thus return directly and
+ // notify the caller that the primary is not considered to be dirty.
+ if transaction.CountSubtransactions() == 0 && primaryErr != nil {
+ return false, nil, nil
}
- // Same as above, we discard log entries in case the primary wasn't dirtied.
- logReplication := ctxlogrus.Extract(ctx)
- if !primaryDirtied {
- discardLogger := logrus.New()
- discardLogger.Out = ioutil.Discard
- logReplication = logrus.NewEntry(discardLogger)
+ // If there was a single subtransactions but the primary didn't cast a vote, then it means
+ // that the primary node has dropped out before secondaries were able to commit any changes
+ // to disk. Given that they cannot ever succeed without the primary, no change to disk
+ // should have happened.
+ if transaction.CountSubtransactions() == 1 && !transaction.DidVote(route.Primary.Storage) {
+ return false, nil, nil
+ }
+
+ primaryDirtied = true
+
+ nodesByState := make(map[string][]string)
+ defer func() {
+ ctxlogrus.Extract(ctx).
+ WithField("transaction.primary", route.Primary.Storage).
+ WithField("transaction.secondaries", nodesByState).
+ Info("transactional node states")
+
+ for reason, nodes := range nodesByState {
+ replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes)))
+ }
+ }()
+
+ markOutdated := func(reason string, nodes []string) {
+ if len(nodes) != 0 {
+ outdated = append(outdated, nodes...)
+ nodesByState[reason] = append(nodesByState[reason], nodes...)
+ }
}
// Replication targets were not added to the transaction, most likely because they are
// either not healthy or out of date. We thus need to make sure to create replication jobs
// for them.
- outdated = append(outdated, route.ReplicationTargets...)
- recordReplication("outdated", len(route.ReplicationTargets))
+ markOutdated("outdated", route.ReplicationTargets)
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
if primaryErr != nil {
- logReplication.WithError(primaryErr).Info("primary failed transaction")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
- recordReplication("primary-failed", len(route.Secondaries))
+ markOutdated("primary-failed", routerNodesToStorages(route.Secondaries))
return
}
@@ -859,9 +863,7 @@ func getUpdatedAndOutdatedSecondaries(
// no changes were done and the nodes hit an error prior to voting. If the primary processed
// the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere.
if transaction.CountSubtransactions() == 0 {
- logReplication.Info("transaction did not create subtransactions")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
- recordReplication("no-votes", len(route.Secondaries))
+ markOutdated("no-votes", routerNodesToStorages(route.Secondaries))
return
}
@@ -869,9 +871,7 @@ func getUpdatedAndOutdatedSecondaries(
// safe route and just replicate to all secondaries.
nodeStates, err := transaction.State()
if err != nil {
- logReplication.WithError(err).Error("could not get transaction state")
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
- recordReplication("missing-tx-state", len(route.Secondaries))
+ markOutdated("missing-tx-state", routerNodesToStorages(route.Secondaries))
return
}
@@ -879,11 +879,7 @@ func getUpdatedAndOutdatedSecondaries(
// then we must assume that it dirtied on-disk state. This modified state may not be what we want,
// but it's what we got. So in order to ensure a consistent state, we need to replicate.
if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted {
- if state == transactions.VoteFailed {
- logReplication.Error("transaction: primary failed vote")
- }
- outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
- recordReplication("primary-not-committed", len(route.Secondaries))
+ markOutdated("primary-not-committed", routerNodesToStorages(route.Secondaries))
return
}
@@ -891,18 +887,17 @@ func getUpdatedAndOutdatedSecondaries(
// error and committed, it's considered up to date and thus does not need replication.
for _, secondary := range route.Secondaries {
if nodeErrors.errByNode[secondary.Storage] != nil {
- outdated = append(outdated, secondary.Storage)
- recordReplication("node-failed", 1)
+ markOutdated("node-failed", []string{secondary.Storage})
continue
}
if nodeStates[secondary.Storage] != transactions.VoteCommitted {
- outdated = append(outdated, secondary.Storage)
- recordReplication("node-not-committed", 1)
+ markOutdated("node-not-committed", []string{secondary.Storage})
continue
}
updated = append(updated, secondary.Storage)
+ nodesByState["updated"] = append(nodesByState["updated"], secondary.Storage)
}
return
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 316b05997..d43f2f02c 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -69,11 +69,11 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
},
{
- desc: "failing vote should not create replication jobs without committed subtransactions",
+ desc: "failing vote should create replication jobs without committed subtransactions",
nodes: []node{
- {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
- {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
- {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 567029378..d7dcdd369 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1522,9 +1522,9 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
}
type mockTransaction struct {
- nodeStates map[string]transactions.VoteResult
- subtransactions int
- didCommitAnySubtransaction bool
+ nodeStates map[string]transactions.VoteResult
+ subtransactions int
+ didVote map[string]bool
}
func (t mockTransaction) ID() uint64 {
@@ -1535,8 +1535,8 @@ func (t mockTransaction) CountSubtransactions() int {
return t.subtransactions
}
-func (t mockTransaction) DidCommitAnySubtransaction() bool {
- return t.didCommitAnySubtransaction
+func (t mockTransaction) DidVote(node string) bool {
+ return t.didVote[node]
}
func (t mockTransaction) State() (map[string]transactions.VoteResult, error) {
@@ -1556,16 +1556,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
anyErr := errors.New("arbitrary error")
for _, tc := range []struct {
- desc string
- primary node
- secondaries []node
- replicas []string
- subtransactions int
- didCommitAnySubtransaction bool
- expectedPrimaryDirtied bool
- expectedOutdated []string
- expectedUpdated []string
- expectedMetrics map[string]int
+ desc string
+ primary node
+ secondaries []node
+ replicas []string
+ subtransactions int
+ didVote map[string]bool
+ expectedPrimaryDirtied bool
+ expectedOutdated []string
+ expectedUpdated []string
+ expectedMetrics map[string]int
}{
{
desc: "single committed node",
@@ -1573,9 +1573,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
},
{
desc: "single failed node",
@@ -1606,24 +1608,24 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- replicas: []string{"replica"},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
expectedMetrics: map[string]int{
"outdated": 1,
},
},
{
- desc: "single failing node with replica",
+ desc: "single failing node with replica is not considered modified",
primary: node{
name: "primary",
state: transactions.VoteFailed,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ subtransactions: 1,
},
{
desc: "single erred node with replica",
@@ -1632,16 +1634,29 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
state: transactions.VoteCommitted,
err: anyErr,
},
- replicas: []string{"replica"},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
expectedMetrics: map[string]int{
"outdated": 1,
},
},
{
+ desc: "single erred node without commit with replica",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ err: anyErr,
+ },
+ replicas: []string{"replica"},
+ subtransactions: 1,
+ expectedPrimaryDirtied: false,
+ },
+ {
desc: "single node without transaction with replica",
primary: node{
name: "primary",
@@ -1664,10 +1679,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedUpdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1", "s2"},
+ expectedMetrics: map[string]int{
+ "updated": 2,
+ },
},
{
desc: "multiple committed nodes with primary err",
@@ -1680,10 +1700,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
expectedMetrics: map[string]int{
"primary-failed": 2,
},
@@ -1698,13 +1720,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted, err: anyErr},
{name: "s2", state: transactions.VoteCommitted},
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
expectedMetrics: map[string]int{
"node-failed": 1,
+ "updated": 1,
},
},
{
@@ -1717,13 +1742,16 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
expectedMetrics: map[string]int{
"node-not-committed": 1,
+ "updated": 1,
},
},
{
@@ -1736,15 +1764,33 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"s1", "s2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
expectedMetrics: map[string]int{
"primary-not-committed": 2,
},
},
{
+ desc: "failure with no primary votes",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ didVote: map[string]bool{
+ "s1": true,
+ "s2": true,
+ },
+ subtransactions: 1,
+ },
+ {
desc: "multiple nodes without subtransactions",
primary: node{
name: "primary",
@@ -1771,15 +1817,18 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- replicas: []string{"r1", "r2"},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"s1", "r1", "r2"},
- expectedUpdated: []string{"s2"},
+ replicas: []string{"r1", "r2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "r1", "r2"},
+ expectedUpdated: []string{"s2"},
expectedMetrics: map[string]int{
"node-not-committed": 1,
"outdated": 2,
+ "updated": 1,
},
},
{
@@ -1792,11 +1841,13 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted, err: anyErr},
},
- replicas: []string{"r1", "r2"},
- didCommitAnySubtransaction: true,
- subtransactions: 1,
- expectedPrimaryDirtied: true,
- expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ replicas: []string{"r1", "r2"},
+ didVote: map[string]bool{
+ "primary": true,
+ },
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2", "r1", "r2"},
expectedMetrics: map[string]int{
"node-failed": 1,
"node-not-committed": 1,
@@ -1823,9 +1874,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
transaction := mockTransaction{
- nodeStates: states,
- subtransactions: tc.subtransactions,
- didCommitAnySubtransaction: tc.didCommitAnySubtransaction,
+ nodeStates: states,
+ subtransactions: tc.subtransactions,
+ didVote: tc.didVote,
}
route := RepositoryMutatorRoute{
diff --git a/internal/praefect/transactions/subtransaction.go b/internal/praefect/transactions/subtransaction.go
index 7636a43eb..87b0ae1b0 100644
--- a/internal/praefect/transactions/subtransaction.go
+++ b/internal/praefect/transactions/subtransaction.go
@@ -345,3 +345,21 @@ func (t *subtransaction) getResult(node string) (VoteResult, error) {
return voter.result, nil
}
+
+func (t *subtransaction) getVote(node string) (*voting.Vote, error) {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ voter, ok := t.votersByNode[node]
+ if !ok {
+ return nil, fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ if voter.vote == nil {
+ return nil, nil
+ }
+
+ // Return a copy of the vote.
+ vote := *voter.vote
+ return &vote, nil
+}
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 2476c0b31..9b6a15886 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -57,8 +57,8 @@ type Transaction interface {
CountSubtransactions() int
// State returns the state of each voter part of the transaction.
State() (map[string]VoteResult, error)
- // DidCommitAnySubtransaction returns whether the transaction committed at least one subtransaction.
- DidCommitAnySubtransaction() bool
+ // DidVote returns whether the given node has cast a vote.
+ DidVote(string) bool
}
// transaction is a session where a set of voters votes on one or more
@@ -166,12 +166,12 @@ func (t *transaction) State() (map[string]VoteResult, error) {
return results, nil
}
- // Collect all subtransactions. As they are ordered by reverse recency, we can simply
- // overwrite our own results.
- for _, subtransaction := range t.subtransactions {
- for voter, result := range subtransaction.state() {
- results[voter] = result
- }
+ // Collect voter results. Given that all subtransactions are created with all voters
+ // registered in the transaction, we can simply take results from the last subtransaction.
+ // Any nodes which didn't yet cast a vote in the last transaction will be in the default
+ // undecided state.
+ for voter, result := range t.subtransactions[len(t.subtransactions)-1].state() {
+ results[voter] = result
}
return results, nil
@@ -186,26 +186,26 @@ func (t *transaction) CountSubtransactions() int {
return len(t.subtransactions)
}
-// DidCommitSubtransaction returns whether the transaction committed at least one subtransaction.
-func (t *transaction) DidCommitAnySubtransaction() bool {
+// DidVote determines whether the given node did cast a vote. If it's not possible to retrieve the
+// vote, then the node by definition didn't cast a vote.
+func (t *transaction) DidVote(node string) bool {
t.lock.Lock()
defer t.lock.Unlock()
+ // If there are no subtransactions, then no vote could've been cast by the given node.
if len(t.subtransactions) == 0 {
return false
}
- // We only need to check the first subtransaction. If it failed, there would
- // be no further subtransactions.
- for _, result := range t.subtransactions[0].state() {
- // It's sufficient to find a single commit in the subtransaction
- // to say it was committed.
- if result == VoteCommitted {
- return true
- }
+ // It's sufficient to take a look at the first transaction.
+ vote, err := t.subtransactions[0].getVote(node)
+ if err != nil {
+ // If it's not possible to retrieve the vote, then we consider the note to not have
+ // cast a vote.
+ return false
}
- return false
+ return vote != nil
}
// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
diff --git a/internal/praefect/transactions/transaction_test.go b/internal/praefect/transactions/transaction_test.go
index 38828396a..93b636bd3 100644
--- a/internal/praefect/transactions/transaction_test.go
+++ b/internal/praefect/transactions/transaction_test.go
@@ -24,3 +24,30 @@ func TestTransactionCancellationWithEmptyTransaction(t *testing.T) {
require.Error(t, err)
require.Equal(t, err, ErrTransactionCanceled)
}
+
+func TestTransaction_DidVote(t *testing.T) {
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ tx, err := newTransaction(1, []Voter{
+ {Name: "v1", Votes: 1},
+ {Name: "v2", Votes: 0},
+ }, 1)
+ require.NoError(t, err)
+
+ // An unregistered voter did not vote.
+ require.False(t, tx.DidVote("unregistered"))
+ // And neither of the registered ones did cast a vote yet.
+ require.False(t, tx.DidVote("v1"))
+ require.False(t, tx.DidVote("v2"))
+
+ // One of both nodes does cast a vote.
+ require.NoError(t, tx.vote(ctx, "v1", voting.VoteFromData([]byte{})))
+ require.True(t, tx.DidVote("v1"))
+ require.False(t, tx.DidVote("v2"))
+
+ // And now the second node does cast a vote, too.
+ require.NoError(t, tx.vote(ctx, "v2", voting.VoteFromData([]byte{})))
+ require.True(t, tx.DidVote("v1"))
+ require.True(t, tx.DidVote("v2"))
+}