Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-08 17:41:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-21 17:37:59 +0300
commitc62590b2d2b4651504edc00ed5293bc7a971d527 (patch)
tree7d64d08cb0a7fcd555d55b044c63e7d820bb1b6d
parentd2db9c1aadedb1318c2b9c76f0a0425eb3d72062 (diff)
Collect verification queue depth metricsmh-verification-metrics
The verification worker is already collecting the metrics on how many jobs it's processing. Praefect is still lacking a metric for the overall verification queue depth which would be very helpful in determining how much work is there left for the verification to be completed. This commit adds a collector for the verification queue depth. The metric is collected for each storage separately. The metric differentiates between replicas that are unverified and replicas that have been verified but the verification has expired.
-rw-r--r--cmd/praefect/main.go24
-rw-r--r--internal/praefect/datastore/collector.go94
-rw-r--r--internal/praefect/datastore/collector_test.go53
3 files changed, 163 insertions, 8 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 5a7989db9..ab28bdea0 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -438,12 +438,20 @@ func run(
)
metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
if db != nil {
- repositoryStoreCollector := datastore.NewRepositoryStoreCollector(
- logger,
- conf.VirtualStorageNames(),
- db,
- conf.Prometheus.ScrapeTimeout)
- queueDepthCollector := datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout)
+ dbMetricCollectors := []prometheus.Collector{
+ datastore.NewRepositoryStoreCollector(logger, conf.VirtualStorageNames(), db, conf.Prometheus.ScrapeTimeout),
+ datastore.NewQueueDepthCollector(logger, db, conf.Prometheus.ScrapeTimeout),
+ }
+
+ if conf.BackgroundVerification.VerificationInterval > 0 {
+ dbMetricCollectors = append(dbMetricCollectors, datastore.NewVerificationQueueDepthCollector(
+ logger,
+ db,
+ conf.Prometheus.ScrapeTimeout,
+ conf.BackgroundVerification.VerificationInterval,
+ conf.StorageNames(),
+ ))
+ }
// Eventually, database-related metrics will always be exported via a separate
// endpoint such that it's possible to set a different scraping interval and thus to
@@ -451,9 +459,9 @@ func run(
// standard and once for the database-specific endpoint. This is done to ensure a
// transitory period where deployments can be moved to the new endpoint without
// causing breakage if they still use the old endpoint.
- dbPromRegistry.MustRegister(repositoryStoreCollector, queueDepthCollector)
+ dbPromRegistry.MustRegister(dbMetricCollectors...)
if !conf.PrometheusExcludeDatabaseFromDefaultMetrics {
- promreg.MustRegister(repositoryStoreCollector, queueDepthCollector)
+ promreg.MustRegister(dbMetricCollectors...)
}
}
promreg.MustRegister(metricsCollectors...)
diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go
index 0b93bd760..cd9bbfc28 100644
--- a/internal/praefect/datastore/collector.go
+++ b/internal/praefect/datastore/collector.go
@@ -178,3 +178,97 @@ GROUP BY job->>'virtual_storage', job->>'target_node_storage', state
q.log.WithError(err).Error("failed to iterate over rows for queue depth metrics")
}
}
+
+const (
+ statusUnverified = "unverified"
+ statusExpired = "expired"
+)
+
+// VerificationQueueDepthCollector collects the verification queue depth metric from the database.
+type VerificationQueueDepthCollector struct {
+ log logrus.FieldLogger
+ timeout time.Duration
+ db glsql.Querier
+ verificationInterval time.Duration
+ verificationQueueDepth *prometheus.GaugeVec
+}
+
+// NewVerificationQueueDepthCollector returns a new VerificationQueueDepthCollector
+func NewVerificationQueueDepthCollector(log logrus.FieldLogger, db glsql.Querier, timeout, verificationInterval time.Duration, configuredStorages map[string][]string) *VerificationQueueDepthCollector {
+ v := &VerificationQueueDepthCollector{
+ log: log.WithField("component", "verification_queue_depth_collector"),
+ timeout: timeout,
+ db: db,
+ verificationInterval: verificationInterval,
+ verificationQueueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "gitaly_praefect_verification_queue_depth",
+ Help: "Number of replicas pending verification.",
+ }, []string{"virtual_storage", "storage", "status"}),
+ }
+
+ // pre-warm metrics to produce output even for storages which have an empty queue.
+ for virtualStorage, storages := range configuredStorages {
+ for _, storage := range storages {
+ for _, status := range []string{statusUnverified, statusExpired} {
+ v.verificationQueueDepth.WithLabelValues(virtualStorage, storage, status)
+ }
+ }
+ }
+
+ return v
+}
+
+// Describe describes the collected metrics to Prometheus.
+func (c *VerificationQueueDepthCollector) Describe(ch chan<- *prometheus.Desc) {
+ c.verificationQueueDepth.Describe(ch)
+}
+
+// Collect collects the verification queue depth metric from the database.
+func (c *VerificationQueueDepthCollector) Collect(ch chan<- prometheus.Metric) {
+ ctx, cancel := context.WithTimeout(context.TODO(), c.timeout)
+ defer cancel()
+
+ rows, err := c.db.QueryContext(ctx, `
+SELECT
+ repositories.virtual_storage,
+ storage,
+ COUNT(*) FILTER (WHERE verified_at IS NULL),
+ COUNT(*) FILTER (WHERE verified_at IS NOT NULL)
+FROM repositories
+JOIN storage_repositories USING (repository_id)
+WHERE verified_at IS NULL
+OR verified_at < now() - $1 * '1 microsecond'::interval
+GROUP BY repositories.virtual_storage, storage
+`, c.verificationInterval.Microseconds())
+ if err != nil {
+ c.log.WithError(err).Error("failed to query verification queue depth metric")
+ return
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var virtualStorage, storage string
+ var unverified, expired float64
+
+ if err := rows.Scan(&virtualStorage, &storage, &unverified, &expired); err != nil {
+ c.log.WithError(err).Error("failed to scan verification queue depth row")
+ return
+ }
+
+ for _, metric := range []struct {
+ status string
+ value float64
+ }{
+ {status: statusUnverified, value: unverified},
+ {status: statusExpired, value: expired},
+ } {
+ c.verificationQueueDepth.WithLabelValues(virtualStorage, storage, metric.status).Set(metric.value)
+ }
+ }
+
+ if err := rows.Err(); err != nil {
+ c.log.WithError(err).Error("failed read verification queue depth rows")
+ }
+
+ c.verificationQueueDepth.Collect(ch)
+}
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 3091c8321..c2e733268 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -324,3 +324,56 @@ gitaly_praefect_replication_queue_depth{state="ready",target_node="storage-1",vi
gitaly_praefect_replication_queue_depth{state="ready",target_node="storage-4",virtual_storage="praefect-1"} %d
`, 1, 1, readyJobs-1, readyJobs-1))))
}
+
+func TestVerificationQueueDepthCollector(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ tx := testdb.New(t).Begin(t)
+ defer tx.Rollback(t)
+
+ rs := NewPostgresRepositoryStore(tx, nil)
+ require.NoError(t,
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path-1", "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false),
+ )
+ require.NoError(t,
+ rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "replica-path-2", "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false),
+ )
+ require.NoError(t,
+ rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "replica-path-3", "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false),
+ )
+
+ _, err := tx.ExecContext(ctx, `
+UPDATE storage_repositories
+SET verified_at = CASE
+ WHEN storage = 'gitaly-2' THEN now() - '30 seconds'::interval
+ ELSE now() - '30 seconds'::interval - '1 microsecond'::interval
+END
+WHERE virtual_storage = 'virtual-storage-1' AND storage != 'gitaly-1'
+ `)
+ require.NoError(t, err)
+
+ logger, hook := test.NewNullLogger()
+ require.NoError(t, testutil.CollectAndCompare(
+ NewVerificationQueueDepthCollector(logrus.NewEntry(logger), tx, time.Minute, 30*time.Second, map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
+ "virtual-storage-2": {"gitaly-1", "gitaly-2", "gitaly-3"},
+ }),
+ strings.NewReader(`
+# HELP gitaly_praefect_verification_queue_depth Number of replicas pending verification.
+# TYPE gitaly_praefect_verification_queue_depth gauge
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-1",virtual_storage="virtual-storage-1"} 0
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-1",virtual_storage="virtual-storage-2"} 0
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-2",virtual_storage="virtual-storage-1"} 0
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-2",virtual_storage="virtual-storage-2"} 0
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-3",virtual_storage="virtual-storage-1"} 2
+gitaly_praefect_verification_queue_depth{status="expired",storage="gitaly-3",virtual_storage="virtual-storage-2"} 0
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-1",virtual_storage="virtual-storage-1"} 2
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-1",virtual_storage="virtual-storage-2"} 1
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-2",virtual_storage="virtual-storage-1"} 0
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-2",virtual_storage="virtual-storage-2"} 1
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-3",virtual_storage="virtual-storage-1"} 0
+gitaly_praefect_verification_queue_depth{status="unverified",storage="gitaly-3",virtual_storage="virtual-storage-2"} 1
+ `),
+ ))
+ require.Empty(t, hook.AllEntries())
+}