diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-02 13:36:26 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-02 08:23:55 +0300 |
commit | 7a07c80125584cd55701c27e677299b59c9820dc (patch) | |
tree | a61a5352d1719b552ae50d28545595480684f8c4 | |
parent | ee820fe9b2e6c0575fcde871dec73fb0ddad726b (diff) |
coordinator: Combine node state log messages into a single message
When determining updated and outdated secondaries for transactional
mutators, we write several log messages stating why certain nodes are
considered outdated or updated. Having this information split up across
multiple messages is quite a pain if one wants to get a quick overview
over why nodes are outdated given that one now has to search for
multiple log messages.
Combine these log messages into a single message which has secondary
node states as a its metadata.
(cherry picked from commit 10a91e71f8f0b6130cf40911e325964b49ecf4c2)
-rw-r--r-- | internal/praefect/coordinator.go | 64 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 6 |
2 files changed, 36 insertions, 34 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 682de22e6..711f37e14 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io/ioutil" "sync" "github.com/golang/protobuf/proto" @@ -798,35 +797,41 @@ func getUpdatedAndOutdatedSecondaries( primaryDirtied = transaction.DidCommitAnySubtransaction() || (transaction.CountSubtransactions() == 0 && primaryErr == nil) - // If the primary wasn't dirtied, then we never replicate any changes. While this is - // duplicates logic defined elsewhere, it's probably good enough given that we only talk - // about metrics here. - recordReplication := func(reason string, replicationCount int) { - if primaryDirtied && replicationCount > 0 { - replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount)) + nodesByState := make(map[string][]string) + defer func() { + // If the primary wasn't dirtied, then we never replicate any changes and thus + // shouldn't log nodes as outdated here. While this is duplicates logic defined + // elsewhere, it's probably good enough given that we only talk about metrics here. + if !primaryDirtied { + return } - } - // Same as above, we discard log entries in case the primary wasn't dirtied. - logReplication := ctxlogrus.Extract(ctx) - if !primaryDirtied { - discardLogger := logrus.New() - discardLogger.Out = ioutil.Discard - logReplication = logrus.NewEntry(discardLogger) + ctxlogrus.Extract(ctx). + WithField("transaction.primary", route.Primary.Storage). + WithField("transaction.secondaries", nodesByState). + Info("transactional node states") + + for reason, nodes := range nodesByState { + replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes))) + } + }() + + markOutdated := func(reason string, nodes []string) { + if len(nodes) != 0 { + outdated = append(outdated, nodes...) + nodesByState[reason] = append(nodesByState[reason], nodes...) + } } // Replication targets were not added to the transaction, most likely because they are // either not healthy or out of date. We thus need to make sure to create replication jobs // for them. - outdated = append(outdated, route.ReplicationTargets...) - recordReplication("outdated", len(route.ReplicationTargets)) + markOutdated("outdated", route.ReplicationTargets) // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. if primaryErr != nil { - logReplication.WithError(primaryErr).Info("primary failed transaction") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("primary-failed", len(route.Secondaries)) + markOutdated("primary-failed", routerNodesToStorages(route.Secondaries)) return } @@ -835,9 +840,7 @@ func getUpdatedAndOutdatedSecondaries( // no changes were done and the nodes hit an error prior to voting. If the primary processed // the RPC successfully, we assume the RPC is not correctly voting and replicate everywhere. if transaction.CountSubtransactions() == 0 { - logReplication.Info("transaction did not create subtransactions") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("no-votes", len(route.Secondaries)) + markOutdated("no-votes", routerNodesToStorages(route.Secondaries)) return } @@ -845,9 +848,7 @@ func getUpdatedAndOutdatedSecondaries( // safe route and just replicate to all secondaries. nodeStates, err := transaction.State() if err != nil { - logReplication.WithError(err).Error("could not get transaction state") - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("missing-tx-state", len(route.Secondaries)) + markOutdated("missing-tx-state", routerNodesToStorages(route.Secondaries)) return } @@ -855,11 +856,7 @@ func getUpdatedAndOutdatedSecondaries( // then we must assume that it dirtied on-disk state. This modified state may not be what we want, // but it's what we got. So in order to ensure a consistent state, we need to replicate. if state := nodeStates[route.Primary.Storage]; state != transactions.VoteCommitted { - if state == transactions.VoteFailed { - logReplication.Error("transaction: primary failed vote") - } - outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) - recordReplication("primary-not-committed", len(route.Secondaries)) + markOutdated("primary-not-committed", routerNodesToStorages(route.Secondaries)) return } @@ -867,18 +864,17 @@ func getUpdatedAndOutdatedSecondaries( // error and committed, it's considered up to date and thus does not need replication. for _, secondary := range route.Secondaries { if nodeErrors.errByNode[secondary.Storage] != nil { - outdated = append(outdated, secondary.Storage) - recordReplication("node-failed", 1) + markOutdated("node-failed", []string{secondary.Storage}) continue } if nodeStates[secondary.Storage] != transactions.VoteCommitted { - outdated = append(outdated, secondary.Storage) - recordReplication("node-not-committed", 1) + markOutdated("node-not-committed", []string{secondary.Storage}) continue } updated = append(updated, secondary.Storage) + nodesByState["updated"] = append(nodesByState["updated"], secondary.Storage) } return diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 37029da49..5395e182d 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1688,6 +1688,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedUpdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "updated": 2, + }, }, { desc: "multiple committed nodes with primary err", @@ -1725,6 +1728,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedOutdated: []string{"s1"}, expectedMetrics: map[string]int{ "node-failed": 1, + "updated": 1, }, }, { @@ -1744,6 +1748,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedOutdated: []string{"s1"}, expectedMetrics: map[string]int{ "node-not-committed": 1, + "updated": 1, }, }, { @@ -1800,6 +1805,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedMetrics: map[string]int{ "node-not-committed": 1, "outdated": 2, + "updated": 1, }, }, { |