diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-25 13:38:27 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-25 13:38:27 +0300 |
commit | 395f7c52943f9d2d101bcc2265373b2671ca0321 (patch) | |
tree | c15a01fb2c7d86a6cb7f73d67ddf28b83a314479 | |
parent | 0a748a623ffecd910dbc2f8f2ff542df3b067622 (diff) | |
parent | bdf5df69bf32c8e736bd33bc4920d57e5ee3261e (diff) |
Merge branch 'pks-coordinator-tx-replication-metrics' into 'master'
coordinator: Add replication metrics for transactions
See merge request gitlab-org/gitaly!3519
-rw-r--r-- | internal/praefect/coordinator.go | 53 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 50 |
2 files changed, 89 insertions, 14 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 5b2cb72fa..741962107 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -236,13 +236,14 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - router Router - txMgr *transactions.Manager - queue datastore.ReplicationEventQueue - rs datastore.RepositoryStore - registry *protoregistry.Registry - conf config.Config - votersMetric *prometheus.HistogramVec + router Router + txMgr *transactions.Manager + queue datastore.ReplicationEventQueue + rs datastore.RepositoryStore + registry *protoregistry.Registry + conf config.Config + votersMetric *prometheus.HistogramVec + txReplicationCountMetric *prometheus.CounterVec } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -276,6 +277,13 @@ func NewCoordinator( }, []string{"virtual_storage"}, ), + txReplicationCountMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_tx_replications_total", + Help: "The number of replication jobs scheduled for transactional RPCs", + }, + []string{"reason"}, + ), } return coordinator @@ -287,6 +295,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) { func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) { c.votersMetric.Collect(metrics) + c.txReplicationCountMetric.Collect(metrics) } func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { @@ -738,7 +747,8 @@ func (c *Coordinator) createTransactionFinalizer( nodeErrors *nodeErrors, ) func() error { return func() error { - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries( + ctx, route, transaction, nodeErrors, c.txReplicationCountMetric) if !primaryDirtied { // If the primary replica was not modified then we don't need to consider the secondaries // outdated. Praefect requires the primary to be always part of the quorum, so no changes @@ -774,15 +784,11 @@ func getUpdatedAndOutdatedSecondaries( route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, + replicationCountMetric *prometheus.CounterVec, ) (primaryDirtied bool, updated []string, outdated []string) { nodeErrors.Lock() defer nodeErrors.Unlock() - // Replication targets were not added to the transaction, most likely because they are - // either not healthy or out of date. We thus need to make sure to create replication jobs - // for them. - outdated = append(outdated, route.ReplicationTargets...) - primaryErr := nodeErrors.errByNode[route.Primary.Storage] // If there were subtransactions, we only assume some changes were made if one of the subtransactions @@ -796,11 +802,27 @@ func getUpdatedAndOutdatedSecondaries( primaryDirtied = transaction.DidCommitAnySubtransaction() || (transaction.CountSubtransactions() == 0 && primaryErr == nil) + recordReplication := func(reason string, replicationCount int) { + // If the primary wasn't dirtied, then we never replicate any changes. While this is + // duplicates logic defined elsewhere, it's probably good enough given that we only + // talk about metrics here. + if primaryDirtied && replicationCount > 0 { + replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount)) + } + } + + // Replication targets were not added to the transaction, most likely because they are + // either not healthy or out of date. We thus need to make sure to create replication jobs + // for them. + outdated = append(outdated, route.ReplicationTargets...) + recordReplication("outdated", len(route.ReplicationTargets)) + // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. if primaryErr != nil { ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("primary-failed", len(route.Secondaries)) return } @@ -811,6 +833,7 @@ func getUpdatedAndOutdatedSecondaries( if transaction.CountSubtransactions() == 0 { ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("no-votes", len(route.Secondaries)) return } @@ -820,6 +843,7 @@ func getUpdatedAndOutdatedSecondaries( if err != nil { ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("missing-tx-state", len(route.Secondaries)) return } @@ -831,6 +855,7 @@ func getUpdatedAndOutdatedSecondaries( ctxlogrus.Extract(ctx).Error("transaction: primary failed vote") } outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("primary-not-committed", len(route.Secondaries)) return } @@ -839,11 +864,13 @@ func getUpdatedAndOutdatedSecondaries( for _, secondary := range route.Secondaries { if nodeErrors.errByNode[secondary.Storage] != nil { outdated = append(outdated, secondary.Storage) + recordReplication("node-failed", 1) continue } if nodeStates[secondary.Storage] != transactions.VoteCommitted { outdated = append(outdated, secondary.Storage) + recordReplication("node-not-committed", 1) continue } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b0bee67d3..0b3c28b7a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "strings" "sync" "sync/atomic" "testing" @@ -12,6 +13,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1582,6 +1585,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied bool expectedOutdated []string expectedUpdated []string + expectedMetrics map[string]int }{ { desc: "single committed node", @@ -1627,6 +1631,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "single failing node with replica", @@ -1650,6 +1657,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "single node without transaction with replica", @@ -1660,6 +1670,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 0, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "multiple committed nodes", @@ -1691,6 +1704,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "primary-failed": 2, + }, }, { desc: "multiple committed nodes with secondary err", @@ -1707,6 +1723,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedUpdated: []string{"s2"}, expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-failed": 1, + }, }, { desc: "partial success", @@ -1723,6 +1742,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedUpdated: []string{"s2"}, expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + }, }, { desc: "failure with (impossible) secondary success", @@ -1738,6 +1760,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "primary-not-committed": 2, + }, }, { desc: "multiple nodes without subtransactions", @@ -1752,6 +1777,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 0, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "no-votes": 2, + }, }, { desc: "multiple nodes with replica and partial failures", @@ -1769,6 +1797,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "r1", "r2"}, expectedUpdated: []string{"s2"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + "outdated": 2, + }, }, { desc: "multiple nodes with replica and partial err", @@ -1785,6 +1817,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + expectedMetrics: map[string]int{ + "node-failed": 1, + "node-not-committed": 1, + "outdated": 2, + }, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -1823,10 +1860,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + metric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "stub", Help: "help", + }, []string{"reason"}) + + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) + + expectedMetrics := "# HELP stub help\n# TYPE stub counter\n" + for metric, value := range tc.expectedMetrics { + expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value) + } + + require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics))) }) } } |