diff options
Diffstat (limited to 'internal/praefect/datastore/collector.go')
-rw-r--r-- | internal/praefect/datastore/collector.go | 63 |
1 files changed, 60 insertions, 3 deletions
diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go index c5ad39fcd..261fcccea 100644 --- a/internal/praefect/datastore/collector.go +++ b/internal/praefect/datastore/collector.go @@ -17,21 +17,27 @@ var descReadOnlyRepositories = prometheus.NewDesc( nil, ) +type HealthChecker interface { + HealthyNodes() map[string][]string +} + // RepositoryStoreCollector collects metrics from the RepositoryStore. type RepositoryStoreCollector struct { log logrus.FieldLogger db glsql.Querier virtualStorages []string repositoryScoped bool + healthChecker HealthChecker } // NewRepositoryStoreCollector returns a new collector. -func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool) *RepositoryStoreCollector { +func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool, hc HealthChecker) *RepositoryStoreCollector { return &RepositoryStoreCollector{ log: log.WithField("component", "RepositoryStoreCollector"), db: db, virtualStorages: virtualStorages, repositoryScoped: repositoryScoped, + healthChecker: hc, } } @@ -57,7 +63,33 @@ func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) { // query considers repository specific primaries. Otherwise, virtual storage scoped // primaries are considered. func (c *RepositoryStoreCollector) queryMetrics(ctx context.Context) (map[string]int, error) { + var virtualStorages, storages []string + for virtualStorage, stors := range c.healthChecker.HealthyNodes() { + for _, storage := range stors { + virtualStorages = append(virtualStorages, virtualStorage) + storages = append(storages, storage) + } + } + + // The query differs slightly depending on whether repository specific primaries are enabled or not. + // + // For virtual storage scoped primaries, we fetch the primaries from the `shard_primaries.node_name` + // column. We then check whether the primary storage is on the latest generation. If not, we add + // it to the read-only count. + // + // When used with repository specific primaries, the repository's primary is taken from the + // `repositories.primary` column. To account for possibility of lazy elections, the query does + // not immediately count outdated primaries as read-only. It additionally checks whether the primary + // node is unhealthy. If it is not, then the primary is counted as read-only since the elector would + // not fail over from a healthy primary. If the primary is unhealthy, the elector could fail over to + // another node when the primary is needed. The query checks whethere there is a healthy and assigned + // in-sync node available. If so, the elector would fail over to it on the next request, thus the + // repository would not be in read-only mode. rows, err := c.db.QueryContext(ctx, ` +WITH healthy_storages AS ( + SELECT unnest($3::text[]) AS virtual_storage, unnest($4::text[]) AS storage +) + SELECT virtual_storage, COUNT(*) FROM ( SELECT @@ -77,9 +109,34 @@ JOIN ( GROUP BY virtual_storage, relative_path ) AS latest_generations USING (virtual_storage, relative_path) LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) -WHERE latest_generation > COALESCE(generation, -1) +WHERE ( + latest_generation > COALESCE(generation, -1) + OR ($2 AND (virtual_storage, storage) NOT IN ( SELECT virtual_storage, storage FROM healthy_storages ))) +AND ( + NOT $2 + OR (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM healthy_storages ) + OR NOT EXISTS ( + SELECT 1 + FROM storage_repositories AS primary_candidate + JOIN healthy_storages USING (virtual_storage, storage) + WHERE primary_candidate.virtual_storage = repositories.virtual_storage + AND primary_candidate.relative_path = repositories.relative_path + AND primary_candidate.generation = latest_generation + AND ( + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = primary_candidate.storage) = 1 + FROM repository_assignments + WHERE repository_assignments.virtual_storage = primary_candidate.virtual_storage + AND repository_assignments.relative_path = primary_candidate.relative_path + ) + ) +) GROUP BY virtual_storage - `, pq.StringArray(c.virtualStorages), c.repositoryScoped) + `, + pq.StringArray(c.virtualStorages), + c.repositoryScoped, + pq.StringArray(virtualStorages), + pq.StringArray(storages), + ) if err != nil { return nil, fmt.Errorf("query: %w", err) } |