diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-18 15:17:20 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-23 12:01:52 +0300 |
commit | fceeed17ecd86f9a958de085e206d49d1a240e56 (patch) | |
tree | 9343430c33dc20a5abef9da3be72e00bc2c6b9b0 | |
parent | 34e8d9bb7d57af2cc298321c92aa207a91523c6e (diff) |
praefect: Extract function to determine updated/outdated nodes
The transactional request finalizer is quite hard to follow. This has
led to multiple issues with the current implementation where it doesn't
correctly handle some nodes and won't create replication jobs when
there's an error.
As a preparation for refactorings, this commit pulls out the code to
determine outdated and up-to-date nodes into its own standalone
function and adds a bunch of tests.
No functional change is expected from this commit.
-rw-r--r-- | internal/praefect/coordinator.go | 101 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 195 |
2 files changed, 251 insertions, 45 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 3356fbf54..716b5b38d 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -657,66 +657,77 @@ func (c *Coordinator) createTransactionFinalizer( params datastore.Params, ) func() error { return func() error { - nodeStates, err := transaction.State() + updated, outdated, err := getUpdatedAndOutdatedSecondaries(ctx, route, transaction) if err != nil { return err } - // If no subtransaction happened, then the called RPC may not be aware of - // transactions at all. We thus need to assume it changed repository state - // and need to create replication jobs. - if transaction.CountSubtransactions() == 0 { - ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") + return c.newRequestFinalizer( + ctx, virtualStorage, targetRepo, route.Primary.Storage, + updated, outdated, change, params)() + } +} - secondaries := make([]string, 0, len(nodeStates)) - for secondary := range nodeStates { - if secondary == route.Primary.Storage { - continue - } - secondaries = append(secondaries, secondary) - } +func getUpdatedAndOutdatedSecondaries( + ctx context.Context, + route RepositoryMutatorRoute, + transaction transactions.Transaction, +) ([]string, []string, error) { + nodeStates, err := transaction.State() + if err != nil { + return nil, nil, err + } - return c.newRequestFinalizer( - ctx, virtualStorage, targetRepo, route.Primary.Storage, - nil, secondaries, change, params)() - } + // If no subtransaction happened, then the called RPC may not be aware of + // transactions at all. We thus need to assume it changed repository state + // and need to create replication jobs. + if transaction.CountSubtransactions() == 0 { + ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") - // If the primary node failed the transaction, then - // there's no sense in trying to replicate from primary - // to secondaries. - if nodeStates[route.Primary.Storage] != transactions.VoteCommitted { - // If the transaction was gracefully stopped, then we don't want to return - // an explicit error here as it indicates an error somewhere else which - // already got returned. - if nodeStates[route.Primary.Storage] == transactions.VoteStopped { - return nil + secondaries := make([]string, 0, len(nodeStates)) + for secondary := range nodeStates { + if secondary == route.Primary.Storage { + continue } - return fmt.Errorf("transaction: primary failed vote") + secondaries = append(secondaries, secondary) } - delete(nodeStates, route.Primary.Storage) - - updatedSecondaries := make([]string, 0, len(nodeStates)) - var outdatedSecondaries []string - for node, state := range nodeStates { - if state == transactions.VoteCommitted { - updatedSecondaries = append(updatedSecondaries, node) - continue - } + return nil, secondaries, nil + } - outdatedSecondaries = append(outdatedSecondaries, node) + // If the primary node failed the transaction, then + // there's no sense in trying to replicate from primary + // to secondaries. + if nodeStates[route.Primary.Storage] != transactions.VoteCommitted { + // If the transaction was gracefully stopped, then we don't want to return + // an explicit error here as it indicates an error somewhere else which + // already got returned. + if nodeStates[route.Primary.Storage] == transactions.VoteStopped { + return nil, nil, nil } + return nil, nil, fmt.Errorf("transaction: primary failed vote") + } + delete(nodeStates, route.Primary.Storage) - // Replication targets were not added to the transaction, most - // likely because they are either not healthy or out of date. - // We thus need to make sure to create replication jobs for - // them. - outdatedSecondaries = append(outdatedSecondaries, route.ReplicationTargets...) + updatedSecondaries := make([]string, 0, len(nodeStates)) + var outdatedSecondaries []string - return c.newRequestFinalizer( - ctx, virtualStorage, targetRepo, route.Primary.Storage, - updatedSecondaries, outdatedSecondaries, change, params)() + for node, state := range nodeStates { + if state == transactions.VoteCommitted { + updatedSecondaries = append(updatedSecondaries, node) + continue + } + + outdatedSecondaries = append(outdatedSecondaries, node) } + + // Replication targets were not added to the transaction, most + // likely because they are either not healthy or out of date. + // We thus need to make sure to create replication jobs for + // them. + outdatedSecondaries = append(outdatedSecondaries, route.ReplicationTargets...) + + return updatedSecondaries, outdatedSecondaries, nil } func routerNodesToStorages(nodes []RouterNode) []string { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 659521207..df52ff50d 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1461,3 +1461,198 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } } + +type mockTransaction struct { + nodeStates map[string]transactions.VoteResult + subtransactions int +} + +func (t mockTransaction) ID() uint64 { + return 0 +} + +func (t mockTransaction) CountSubtransactions() int { + return t.subtransactions +} + +func (t mockTransaction) State() (map[string]transactions.VoteResult, error) { + return t.nodeStates, nil +} + +func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { + type node struct { + name string + state transactions.VoteResult + } + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range []struct { + desc string + primary node + secondaries []node + replicas []string + subtransactions int + expectedErr error + expectedOutdated []string + expectedUpdated []string + }{ + { + desc: "single committed node", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + subtransactions: 1, + }, + { + desc: "single failed node", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + subtransactions: 1, + expectedErr: errors.New("transaction: primary failed vote"), + }, + { + desc: "single node without subtransactions", + primary: node{ + name: "primary", + }, + subtransactions: 0, + }, + { + desc: "single successful node with replica", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + replicas: []string{"replica"}, + subtransactions: 1, + expectedOutdated: []string{"replica"}, + }, + { + desc: "single failing node with replica", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + replicas: []string{"replica"}, + subtransactions: 1, + expectedErr: errors.New("transaction: primary failed vote"), + }, + { + desc: "single node without transaction with replica", + primary: node{ + name: "primary", + }, + replicas: []string{"replica"}, + subtransactions: 0, + }, + { + desc: "multiple committed nodes", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteCommitted}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 1, + expectedUpdated: []string{"s1", "s2"}, + }, + { + desc: "partial success", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 1, + expectedUpdated: []string{"s2"}, + expectedOutdated: []string{"s1"}, + }, + { + desc: "failure with (impossible) secondary success", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 1, + expectedErr: errors.New("transaction: primary failed vote"), + }, + { + desc: "multiple nodes without subtransactions", + primary: node{ + name: "primary", + state: transactions.VoteFailed, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + subtransactions: 0, + expectedOutdated: []string{"s1", "s2"}, + }, + { + desc: "multiple nodes with replica and partial failures", + primary: node{ + name: "primary", + state: transactions.VoteCommitted, + }, + secondaries: []node{ + {name: "s1", state: transactions.VoteFailed}, + {name: "s2", state: transactions.VoteCommitted}, + }, + replicas: []string{"r1", "r2"}, + subtransactions: 1, + expectedOutdated: []string{"s1", "r1", "r2"}, + expectedUpdated: []string{"s2"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + nodes := append(tc.secondaries, tc.primary) + voters := make([]transactions.Voter, len(nodes)) + states := make(map[string]transactions.VoteResult) + + for i, node := range nodes { + voters[i] = transactions.Voter{ + Name: node.name, + Votes: 1, + } + states[node.name] = node.state + } + + transaction := mockTransaction{ + nodeStates: states, + subtransactions: tc.subtransactions, + } + + route := RepositoryMutatorRoute{ + Primary: RouterNode{ + Storage: tc.primary.name, + }, + } + for _, secondary := range tc.secondaries { + route.Secondaries = append(route.Secondaries, RouterNode{ + Storage: secondary.name, + }) + } + route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) + + updated, outdated, err := getUpdatedAndOutdatedSecondaries(ctx, route, transaction) + require.Equal(t, tc.expectedErr, err) + require.ElementsMatch(t, tc.expectedUpdated, updated) + require.ElementsMatch(t, tc.expectedOutdated, outdated) + }) + } +} |