Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-14 18:22:43 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-02 08:23:54 +0300
commitca4b8f5116d44a38abf94e377918d78e864506cc (patch)
tree1c4b79bc515ce52b3657e09d8b0aa46eda3ea91e
parent97869402ef8847d8bc0bee475303f6e3a625b9f9 (diff)
Consider primary modified only if a subtransaction was committed
Praefect currently considers secondaries to be outdated after every mutator RPC in which they did not successfully commit the transaction. This causes Praefect to consider secondaries outdated even after RPCs that could have clearly not modified the primary, for example when the network connection was down and the request was not even sent to the network. This can cause secondaries to be considered outdated especially in failover scenarios, where there's a brief period when the primary's connection may be down before Praefect's health check realizes it. This commit addresses the situation by only considering the secondaries outdated if there could have been modifications to the primary replica. This is done by checking whether at least one subtranscation was committed during the transaction. As Praefect requires primary to always agree on the vote results, any committed subtransaction indicates the primary has likely performed some changes. If there are no committed subtransactions, then the primary would not have persisted reference updates to the disk. This still doesn't guarantee the primary actually performed the changes, as Praefect simply instructs the Gitalys to commit without waiting for a response. For example, the primary could receive the commit message and immediately crash. In such cases the primary would perform no changes and the secondaries would still be considered outdated. Using the subtransactions to determine whether changes could have been made does reduce the occurrence of this problem in a major way though. As a special case, if a transactional RPC succeeds without any subtransactions, we'll still replicate on every secondary. Every transactional RPC should have voted on success, so this indicates something is very off. For non-transactional RPCs, we still don't know for sure if the primary performed changes or not. As such, we still need to consider the secondaries outdated after failed RPCs. Most of these RPCs do not result an `update` type replication job and would not increment the generation anyway. Some do, which we may want to address in follow-ups. Changelog: fixed (cherry picked from commit d87747c82394e0ba0a2fd09a01e840dd9f6b6d27)
-rw-r--r--internal/praefect/coordinator.go48
-rw-r--r--internal/praefect/coordinator_pg_test.go53
-rw-r--r--internal/praefect/coordinator_test.go111
3 files changed, 145 insertions, 67 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 45d0e29e2..65d365c46 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -732,7 +732,13 @@ func (c *Coordinator) createTransactionFinalizer(
nodeErrors *nodeErrors,
) func() error {
return func() error {
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ if !primaryDirtied {
+ // If the primary replica was not modified then we don't need to consider the secondaries
+ // outdated. Praefect requires the primary to be always part of the quorum, so no changes
+ // to secondaries would be made without primary being in agreement.
+ return nil
+ }
return c.newRequestFinalizer(
ctx, virtualStorage, targetRepo, route.Primary.Storage,
@@ -743,9 +749,11 @@ func (c *Coordinator) createTransactionFinalizer(
// getUpdatedAndOutdatedSecondaries returns all nodes which can be considered up-to-date or outdated
// after the given transaction. A node is considered outdated, if one of the following is true:
//
-// - No subtransactions were created. This really is only a safeguard in case the RPC wasn't aware
-// of transactions and thus failed to correctly assert its state matches across nodes. This is
-// rather pessimistic, as it could also indicate that an RPC simply didn't change anything.
+// - No subtransactions were created and the RPC was successful on the primary. This really is only
+// a safeguard in case the RPC wasn't aware of transactions and thus failed to correctly assert
+// its state matches across nodes. This is rather pessimistic, as it could also indicate that an
+// RPC simply didn't change anything. If the RPC was a failure on the primary and there were no
+// subtransactions, we assume no changes were done and that the nodes failed prior to voting.
//
// - The node failed to be part of the quorum. As a special case, if the primary fails the vote, all
// nodes need to get replication jobs.
@@ -760,7 +768,7 @@ func getUpdatedAndOutdatedSecondaries(
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
-) (updated []string, outdated []string) {
+) (primaryDirtied bool, updated []string, outdated []string) {
nodeErrors.Lock()
defer nodeErrors.Unlock()
@@ -769,17 +777,31 @@ func getUpdatedAndOutdatedSecondaries(
// for them.
outdated = append(outdated, route.ReplicationTargets...)
+ primaryErr := nodeErrors.errByNode[route.Primary.Storage]
+
+ // If there were subtransactions, we only assume some changes were made if one of the subtransactions
+ // was committed.
+ //
+ // If there were no subtransactions, we assume changes were performed only if the primary successfully
+ // processed the RPC. This might be an RPC that is not correctly casting votes thus we replicate everywhere.
+ //
+ // If there were no subtransactions and the primary failed the RPC, we assume no changes have been made and
+ // the nodes simply failed before voting.
+ primaryDirtied = transaction.DidCommitAnySubtransaction() ||
+ (transaction.CountSubtransactions() == 0 && primaryErr == nil)
+
// If the primary errored, then we need to assume that it has modified on-disk state and
// thus need to replicate those changes to secondaries.
- if err := nodeErrors.errByNode[route.Primary.Storage]; err != nil {
- ctxlogrus.Extract(ctx).WithError(err).Info("primary failed transaction")
+ if primaryErr != nil {
+ ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
return
}
- // If no subtransaction happened, then the called RPC may not be aware of transactions at
- // all. We thus need to assume it changed repository state and need to create replication
- // jobs.
+ // If no subtransaction happened, then the called RPC may not be aware of transactions or
+ // the nodes failed before casting any votes. If the primary failed the RPC, we assume
+ // no changes were done and the nodes hit an error prior to voting. If the primary processed
+ // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere.
if transaction.CountSubtransactions() == 0 {
ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
outdated = append(outdated, routerNodesToStorages(route.Secondaries)...)
@@ -795,9 +817,9 @@ func getUpdatedAndOutdatedSecondaries(
return
}
- // If the primary node did not commit the transaction, then we must assume that it dirtied
- // on-disk state. This modified state may not be what we want, but it's what we got. So in
- // order to ensure a consistent state, we need to replicate.
+ // If the primary node did not commit the transaction but there were some subtransactions committed,
+ // then we must assume that it dirtied on-disk state. This modified state may not be what we want,
+ // but it's what we got. So in order to ensure a consistent state, we need to replicate.
if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted {
if state == transactions.VoteFailed {
ctxlogrus.Extract(ctx).Error("transaction: primary failed vote")
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 80a383517..df5dec438 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -69,11 +69,19 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
},
{
- desc: "failing vote should create replication jobs",
+ desc: "failing vote should not create replication jobs without committed subtransactions",
nodes: []node{
- {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
- {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
- {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "qux", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "bar", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 0},
+ },
+ },
+ {
+ desc: "failing vote should create replication jobs with committed subtransaction",
+ nodes: []node{
+ {primary: true, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "foo", shouldSucceed: false}}, shouldGetRepl: false, shouldParticipate: true, expectedGeneration: 1},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "qux", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
+ {primary: false, subtransactions: subtransactions{{vote: "foo", shouldSucceed: true}, {vote: "bar", shouldSucceed: false}}, shouldGetRepl: true, shouldParticipate: true, expectedGeneration: 0},
},
},
{
@@ -108,20 +116,41 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs",
+ // All transactional RPCs are expected to cast vote if they are successful. If they don't, something is wrong
+ // and we should replicate to the secondaries to be sure.
+ desc: "unstarted transaction creates replication jobs if the primary is successful",
nodes: []node{
{primary: true, shouldGetRepl: false, expectedGeneration: 1},
{primary: false, shouldGetRepl: true, expectedGeneration: 0},
},
},
{
- // If the transaction didn't receive any votes at all, we need to assume
- // that the RPC wasn't aware of transactions and thus need to schedule
- // replication jobs.
- desc: "unstarted transaction should create replication jobs for outdated node",
+ // If the RPC fails without any subtransactions, the Gitalys would not have performed any changes yet.
+ // We don't have to consider the secondaries outdated.
+ desc: "unstarted transaction doesn't create replication jobs if the primary fails",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, expectedGeneration: 0},
+ {primary: false, expectedGeneration: 0},
+ },
+ },
+ {
+ // If there were no subtransactions and the RPC failed, the primary should not have performed any changes.
+ // We don't need to schedule replication jobs to replication targets either as they'd have jobs
+ // already scheduled by the earlier RPC that made them outdated or by the reconciler.
+ desc: "unstarted transaction should not create replication jobs for outdated node if the primary fails",
+ primaryFails: true,
+ nodes: []node{
+ {primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 1, expectedGeneration: 1},
+ {primary: false, shouldGetRepl: false, generation: 0, expectedGeneration: 0},
+ {primary: false, shouldGetRepl: false, generation: datastore.GenerationUnknown, expectedGeneration: datastore.GenerationUnknown},
+ },
+ },
+ {
+ // If there were no subtransactions and the primary did not fail, we should schedule replication jobs to every secondary.
+ // All transactional RPCs are expected to vote if they are successful.
+ desc: "unstarted transaction should create replication jobs for outdated node if the primary succeeds",
nodes: []node{
{primary: true, shouldGetRepl: false, generation: 1, expectedGeneration: 2},
{primary: false, shouldGetRepl: true, generation: 1, expectedGeneration: 1},
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index f9812fe3e..fd8f1e0ea 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1573,13 +1573,15 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
anyErr := errors.New("arbitrary error")
for _, tc := range []struct {
- desc string
- primary node
- secondaries []node
- replicas []string
- subtransactions int
- expectedOutdated []string
- expectedUpdated []string
+ desc string
+ primary node
+ secondaries []node
+ replicas []string
+ subtransactions int
+ didCommitAnySubtransaction bool
+ expectedPrimaryDirtied bool
+ expectedOutdated []string
+ expectedUpdated []string
}{
{
desc: "single committed node",
@@ -1587,7 +1589,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- subtransactions: 1,
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
},
{
desc: "single failed node",
@@ -1609,7 +1613,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
primary: node{
name: "primary",
},
- subtransactions: 0,
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
},
{
desc: "single successful node with replica",
@@ -1617,9 +1622,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
name: "primary",
state: transactions.VoteCommitted,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "single failing node with replica",
@@ -1638,18 +1645,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
state: transactions.VoteCommitted,
err: anyErr,
},
- replicas: []string{"replica"},
- subtransactions: 1,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "single node without transaction with replica",
primary: node{
name: "primary",
},
- replicas: []string{"replica"},
- subtransactions: 0,
- expectedOutdated: []string{"replica"},
+ replicas: []string{"replica"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"replica"},
},
{
desc: "multiple committed nodes",
@@ -1661,8 +1671,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s1", "s2"},
},
{
desc: "multiple committed nodes with primary err",
@@ -1675,8 +1687,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple committed nodes with secondary err",
@@ -1688,9 +1702,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteCommitted, err: anyErr},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
},
{
desc: "partial success",
@@ -1702,9 +1718,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedUpdated: []string{"s2"},
- expectedOutdated: []string{"s1"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
},
{
desc: "failure with (impossible) secondary success",
@@ -1716,8 +1734,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple nodes without subtransactions",
@@ -1729,8 +1749,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- subtransactions: 0,
- expectedOutdated: []string{"s1", "s2"},
+ subtransactions: 0,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2"},
},
{
desc: "multiple nodes with replica and partial failures",
@@ -1742,10 +1763,12 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "r1", "r2"},
- expectedUpdated: []string{"s2"},
+ replicas: []string{"r1", "r2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "r1", "r2"},
+ expectedUpdated: []string{"s2"},
},
{
desc: "multiple nodes with replica and partial err",
@@ -1757,9 +1780,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
{name: "s1", state: transactions.VoteFailed},
{name: "s2", state: transactions.VoteCommitted, err: anyErr},
},
- replicas: []string{"r1", "r2"},
- subtransactions: 1,
- expectedOutdated: []string{"s1", "s2", "r1", "r2"},
+ replicas: []string{"r1", "r2"},
+ didCommitAnySubtransaction: true,
+ subtransactions: 1,
+ expectedPrimaryDirtied: true,
+ expectedOutdated: []string{"s1", "s2", "r1", "r2"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -1781,8 +1806,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
transaction := mockTransaction{
- nodeStates: states,
- subtransactions: tc.subtransactions,
+ nodeStates: states,
+ subtransactions: tc.subtransactions,
+ didCommitAnySubtransaction: tc.didCommitAnySubtransaction,
}
route := RepositoryMutatorRoute{
@@ -1797,7 +1823,8 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
}
route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
- updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors)
+ require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
})