Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-18 15:17:20 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-23 12:01:52 +0300
commitfceeed17ecd86f9a958de085e206d49d1a240e56 (patch)
tree9343430c33dc20a5abef9da3be72e00bc2c6b9b0
parent34e8d9bb7d57af2cc298321c92aa207a91523c6e (diff)
praefect: Extract function to determine updated/outdated nodes
The transactional request finalizer is quite hard to follow. This has led to multiple issues with the current implementation where it doesn't correctly handle some nodes and won't create replication jobs when there's an error. As a preparation for refactorings, this commit pulls out the code to determine outdated and up-to-date nodes into its own standalone function and adds a bunch of tests. No functional change is expected from this commit.
-rw-r--r--internal/praefect/coordinator.go101
-rw-r--r--internal/praefect/coordinator_test.go195
2 files changed, 251 insertions, 45 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 3356fbf54..716b5b38d 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -657,66 +657,77 @@ func (c *Coordinator) createTransactionFinalizer(
params datastore.Params,
) func() error {
return func() error {
- nodeStates, err := transaction.State()
+ updated, outdated, err := getUpdatedAndOutdatedSecondaries(ctx, route, transaction)
if err != nil {
return err
}
- // If no subtransaction happened, then the called RPC may not be aware of
- // transactions at all. We thus need to assume it changed repository state
- // and need to create replication jobs.
- if transaction.CountSubtransactions() == 0 {
- ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
+ return c.newRequestFinalizer(
+ ctx, virtualStorage, targetRepo, route.Primary.Storage,
+ updated, outdated, change, params)()
+ }
+}
- secondaries := make([]string, 0, len(nodeStates))
- for secondary := range nodeStates {
- if secondary == route.Primary.Storage {
- continue
- }
- secondaries = append(secondaries, secondary)
- }
+func getUpdatedAndOutdatedSecondaries(
+ ctx context.Context,
+ route RepositoryMutatorRoute,
+ transaction transactions.Transaction,
+) ([]string, []string, error) {
+ nodeStates, err := transaction.State()
+ if err != nil {
+ return nil, nil, err
+ }
- return c.newRequestFinalizer(
- ctx, virtualStorage, targetRepo, route.Primary.Storage,
- nil, secondaries, change, params)()
- }
+ // If no subtransaction happened, then the called RPC may not be aware of
+ // transactions at all. We thus need to assume it changed repository state
+ // and need to create replication jobs.
+ if transaction.CountSubtransactions() == 0 {
+ ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions")
- // If the primary node failed the transaction, then
- // there's no sense in trying to replicate from primary
- // to secondaries.
- if nodeStates[route.Primary.Storage] != transactions.VoteCommitted {
- // If the transaction was gracefully stopped, then we don't want to return
- // an explicit error here as it indicates an error somewhere else which
- // already got returned.
- if nodeStates[route.Primary.Storage] == transactions.VoteStopped {
- return nil
+ secondaries := make([]string, 0, len(nodeStates))
+ for secondary := range nodeStates {
+ if secondary == route.Primary.Storage {
+ continue
}
- return fmt.Errorf("transaction: primary failed vote")
+ secondaries = append(secondaries, secondary)
}
- delete(nodeStates, route.Primary.Storage)
-
- updatedSecondaries := make([]string, 0, len(nodeStates))
- var outdatedSecondaries []string
- for node, state := range nodeStates {
- if state == transactions.VoteCommitted {
- updatedSecondaries = append(updatedSecondaries, node)
- continue
- }
+ return nil, secondaries, nil
+ }
- outdatedSecondaries = append(outdatedSecondaries, node)
+ // If the primary node failed the transaction, then
+ // there's no sense in trying to replicate from primary
+ // to secondaries.
+ if nodeStates[route.Primary.Storage] != transactions.VoteCommitted {
+ // If the transaction was gracefully stopped, then we don't want to return
+ // an explicit error here as it indicates an error somewhere else which
+ // already got returned.
+ if nodeStates[route.Primary.Storage] == transactions.VoteStopped {
+ return nil, nil, nil
}
+ return nil, nil, fmt.Errorf("transaction: primary failed vote")
+ }
+ delete(nodeStates, route.Primary.Storage)
- // Replication targets were not added to the transaction, most
- // likely because they are either not healthy or out of date.
- // We thus need to make sure to create replication jobs for
- // them.
- outdatedSecondaries = append(outdatedSecondaries, route.ReplicationTargets...)
+ updatedSecondaries := make([]string, 0, len(nodeStates))
+ var outdatedSecondaries []string
- return c.newRequestFinalizer(
- ctx, virtualStorage, targetRepo, route.Primary.Storage,
- updatedSecondaries, outdatedSecondaries, change, params)()
+ for node, state := range nodeStates {
+ if state == transactions.VoteCommitted {
+ updatedSecondaries = append(updatedSecondaries, node)
+ continue
+ }
+
+ outdatedSecondaries = append(outdatedSecondaries, node)
}
+
+ // Replication targets were not added to the transaction, most
+ // likely because they are either not healthy or out of date.
+ // We thus need to make sure to create replication jobs for
+ // them.
+ outdatedSecondaries = append(outdatedSecondaries, route.ReplicationTargets...)
+
+ return updatedSecondaries, outdatedSecondaries, nil
}
func routerNodesToStorages(nodes []RouterNode) []string {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 659521207..df52ff50d 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1461,3 +1461,198 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
})
}
}
+
+type mockTransaction struct {
+ nodeStates map[string]transactions.VoteResult
+ subtransactions int
+}
+
+func (t mockTransaction) ID() uint64 {
+ return 0
+}
+
+func (t mockTransaction) CountSubtransactions() int {
+ return t.subtransactions
+}
+
+func (t mockTransaction) State() (map[string]transactions.VoteResult, error) {
+ return t.nodeStates, nil
+}
+
+func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
+ type node struct {
+ name string
+ state transactions.VoteResult
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, tc := range []struct {
+ desc string
+ primary node
+ secondaries []node
+ replicas []string
+ subtransactions int
+ expectedErr error
+ expectedOutdated []string
+ expectedUpdated []string
+ }{
+ {
+ desc: "single committed node",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ subtransactions: 1,
+ },
+ {
+ desc: "single failed node",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ subtransactions: 1,
+ expectedErr: errors.New("transaction: primary failed vote"),
+ },
+ {
+ desc: "single node without subtransactions",
+ primary: node{
+ name: "primary",
+ },
+ subtransactions: 0,
+ },
+ {
+ desc: "single successful node with replica",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ replicas: []string{"replica"},
+ subtransactions: 1,
+ expectedOutdated: []string{"replica"},
+ },
+ {
+ desc: "single failing node with replica",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ replicas: []string{"replica"},
+ subtransactions: 1,
+ expectedErr: errors.New("transaction: primary failed vote"),
+ },
+ {
+ desc: "single node without transaction with replica",
+ primary: node{
+ name: "primary",
+ },
+ replicas: []string{"replica"},
+ subtransactions: 0,
+ },
+ {
+ desc: "multiple committed nodes",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteCommitted},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 1,
+ expectedUpdated: []string{"s1", "s2"},
+ },
+ {
+ desc: "partial success",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 1,
+ expectedUpdated: []string{"s2"},
+ expectedOutdated: []string{"s1"},
+ },
+ {
+ desc: "failure with (impossible) secondary success",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 1,
+ expectedErr: errors.New("transaction: primary failed vote"),
+ },
+ {
+ desc: "multiple nodes without subtransactions",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteFailed,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ subtransactions: 0,
+ expectedOutdated: []string{"s1", "s2"},
+ },
+ {
+ desc: "multiple nodes with replica and partial failures",
+ primary: node{
+ name: "primary",
+ state: transactions.VoteCommitted,
+ },
+ secondaries: []node{
+ {name: "s1", state: transactions.VoteFailed},
+ {name: "s2", state: transactions.VoteCommitted},
+ },
+ replicas: []string{"r1", "r2"},
+ subtransactions: 1,
+ expectedOutdated: []string{"s1", "r1", "r2"},
+ expectedUpdated: []string{"s2"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodes := append(tc.secondaries, tc.primary)
+ voters := make([]transactions.Voter, len(nodes))
+ states := make(map[string]transactions.VoteResult)
+
+ for i, node := range nodes {
+ voters[i] = transactions.Voter{
+ Name: node.name,
+ Votes: 1,
+ }
+ states[node.name] = node.state
+ }
+
+ transaction := mockTransaction{
+ nodeStates: states,
+ subtransactions: tc.subtransactions,
+ }
+
+ route := RepositoryMutatorRoute{
+ Primary: RouterNode{
+ Storage: tc.primary.name,
+ },
+ }
+ for _, secondary := range tc.secondaries {
+ route.Secondaries = append(route.Secondaries, RouterNode{
+ Storage: secondary.name,
+ })
+ }
+ route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
+
+ updated, outdated, err := getUpdatedAndOutdatedSecondaries(ctx, route, transaction)
+ require.Equal(t, tc.expectedErr, err)
+ require.ElementsMatch(t, tc.expectedUpdated, updated)
+ require.ElementsMatch(t, tc.expectedOutdated, outdated)
+ })
+ }
+}