diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-05-20 11:38:52 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-05-21 17:29:45 +0300 |
commit | bdf5df69bf32c8e736bd33bc4920d57e5ee3261e (patch) | |
tree | 76fbff124b4707d537e3114305b407965af7f224 | |
parent | 30e482604548b0d923f13b11dbc5aa63fdc289f9 (diff) |
coordinator: Add replication metrics for transactions
Without scraping through the logs, it's currently hard to determine how
many replication jobs we're queueing for transactional RPC and what the
reasons for those jobs are. To improve the situation, this commit adds a
new metric which tracks the reason why replication jobs are queued to
improve visibility.
Changelog: added
-rw-r--r-- | internal/praefect/coordinator.go | 43 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 50 |
2 files changed, 84 insertions, 9 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 75288add4..04800f89c 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -237,13 +237,14 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - router Router - txMgr *transactions.Manager - queue datastore.ReplicationEventQueue - rs datastore.RepositoryStore - registry *protoregistry.Registry - conf config.Config - votersMetric *prometheus.HistogramVec + router Router + txMgr *transactions.Manager + queue datastore.ReplicationEventQueue + rs datastore.RepositoryStore + registry *protoregistry.Registry + conf config.Config + votersMetric *prometheus.HistogramVec + txReplicationCountMetric *prometheus.CounterVec } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -277,6 +278,13 @@ func NewCoordinator( }, []string{"virtual_storage"}, ), + txReplicationCountMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_praefect_tx_replications_total", + Help: "The number of replication jobs scheduled for transactional RPCs", + }, + []string{"reason"}, + ), } return coordinator @@ -288,6 +296,7 @@ func (c *Coordinator) Describe(descs chan<- *prometheus.Desc) { func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) { c.votersMetric.Collect(metrics) + c.txReplicationCountMetric.Collect(metrics) } func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { @@ -739,7 +748,8 @@ func (c *Coordinator) createTransactionFinalizer( nodeErrors *nodeErrors, ) func() error { return func() error { - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries( + ctx, route, transaction, nodeErrors, c.txReplicationCountMetric) if !primaryDirtied { // If the primary replica was not modified then we don't need to consider the secondaries // outdated. Praefect requires the primary to be always part of the quorum, so no changes @@ -775,6 +785,7 @@ func getUpdatedAndOutdatedSecondaries( route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, + replicationCountMetric *prometheus.CounterVec, ) (primaryDirtied bool, updated []string, outdated []string) { nodeErrors.Lock() defer nodeErrors.Unlock() @@ -792,16 +803,27 @@ func getUpdatedAndOutdatedSecondaries( primaryDirtied = transaction.DidCommitAnySubtransaction() || (transaction.CountSubtransactions() == 0 && primaryErr == nil) + recordReplication := func(reason string, replicationCount int) { + // If the primary wasn't dirtied, then we never replicate any changes. While this is + // duplicates logic defined elsewhere, it's probably good enough given that we only + // talk about metrics here. + if primaryDirtied && replicationCount > 0 { + replicationCountMetric.WithLabelValues(reason).Add(float64(replicationCount)) + } + } + // Replication targets were not added to the transaction, most likely because they are // either not healthy or out of date. We thus need to make sure to create replication jobs // for them. outdated = append(outdated, route.ReplicationTargets...) + recordReplication("outdated", len(route.ReplicationTargets)) // If the primary errored, then we need to assume that it has modified on-disk state and // thus need to replicate those changes to secondaries. if primaryErr != nil { ctxlogrus.Extract(ctx).WithError(primaryErr).Info("primary failed transaction") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("primary-failed", len(route.Secondaries)) return } @@ -812,6 +834,7 @@ func getUpdatedAndOutdatedSecondaries( if transaction.CountSubtransactions() == 0 { ctxlogrus.Extract(ctx).Info("transaction did not create subtransactions") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("no-votes", len(route.Secondaries)) return } @@ -821,6 +844,7 @@ func getUpdatedAndOutdatedSecondaries( if err != nil { ctxlogrus.Extract(ctx).WithError(err).Error("could not get transaction state") outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("missing-tx-state", len(route.Secondaries)) return } @@ -832,6 +856,7 @@ func getUpdatedAndOutdatedSecondaries( ctxlogrus.Extract(ctx).Error("transaction: primary failed vote") } outdated = append(outdated, routerNodesToStorages(route.Secondaries)...) + recordReplication("primary-not-committed", len(route.Secondaries)) return } @@ -840,11 +865,13 @@ func getUpdatedAndOutdatedSecondaries( for _, secondary := range route.Secondaries { if nodeErrors.errByNode[secondary.Storage] != nil { outdated = append(outdated, secondary.Storage) + recordReplication("node-failed", 1) continue } if nodeStates[secondary.Storage] != transactions.VoteCommitted { outdated = append(outdated, secondary.Storage) + recordReplication("node-not-committed", 1) continue } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b0bee67d3..0b3c28b7a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "strings" "sync" "sync/atomic" "testing" @@ -12,6 +13,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1582,6 +1585,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied bool expectedOutdated []string expectedUpdated []string + expectedMetrics map[string]int }{ { desc: "single committed node", @@ -1627,6 +1631,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "single failing node with replica", @@ -1650,6 +1657,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "single node without transaction with replica", @@ -1660,6 +1670,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 0, expectedPrimaryDirtied: true, expectedOutdated: []string{"replica"}, + expectedMetrics: map[string]int{ + "outdated": 1, + }, }, { desc: "multiple committed nodes", @@ -1691,6 +1704,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "primary-failed": 2, + }, }, { desc: "multiple committed nodes with secondary err", @@ -1707,6 +1723,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedUpdated: []string{"s2"}, expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-failed": 1, + }, }, { desc: "partial success", @@ -1723,6 +1742,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedUpdated: []string{"s2"}, expectedOutdated: []string{"s1"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + }, }, { desc: "failure with (impossible) secondary success", @@ -1738,6 +1760,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "primary-not-committed": 2, + }, }, { desc: "multiple nodes without subtransactions", @@ -1752,6 +1777,9 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 0, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2"}, + expectedMetrics: map[string]int{ + "no-votes": 2, + }, }, { desc: "multiple nodes with replica and partial failures", @@ -1769,6 +1797,10 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "r1", "r2"}, expectedUpdated: []string{"s2"}, + expectedMetrics: map[string]int{ + "node-not-committed": 1, + "outdated": 2, + }, }, { desc: "multiple nodes with replica and partial err", @@ -1785,6 +1817,11 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { subtransactions: 1, expectedPrimaryDirtied: true, expectedOutdated: []string{"s1", "s2", "r1", "r2"}, + expectedMetrics: map[string]int{ + "node-failed": 1, + "node-not-committed": 1, + "outdated": 2, + }, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -1823,10 +1860,21 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { } route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors) + metric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "stub", Help: "help", + }, []string{"reason"}) + + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) + + expectedMetrics := "# HELP stub help\n# TYPE stub counter\n" + for metric, value := range tc.expectedMetrics { + expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value) + } + + require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics))) }) } } |