Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/datastore/collector.go')
-rw-r--r--internal/praefect/datastore/collector.go63
1 files changed, 60 insertions, 3 deletions
diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go
index c5ad39fcd..261fcccea 100644
--- a/internal/praefect/datastore/collector.go
+++ b/internal/praefect/datastore/collector.go
@@ -17,21 +17,27 @@ var descReadOnlyRepositories = prometheus.NewDesc(
nil,
)
+type HealthChecker interface {
+ HealthyNodes() map[string][]string
+}
+
// RepositoryStoreCollector collects metrics from the RepositoryStore.
type RepositoryStoreCollector struct {
log logrus.FieldLogger
db glsql.Querier
virtualStorages []string
repositoryScoped bool
+ healthChecker HealthChecker
}
// NewRepositoryStoreCollector returns a new collector.
-func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool) *RepositoryStoreCollector {
+func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool, hc HealthChecker) *RepositoryStoreCollector {
return &RepositoryStoreCollector{
log: log.WithField("component", "RepositoryStoreCollector"),
db: db,
virtualStorages: virtualStorages,
repositoryScoped: repositoryScoped,
+ healthChecker: hc,
}
}
@@ -57,7 +63,33 @@ func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) {
// query considers repository specific primaries. Otherwise, virtual storage scoped
// primaries are considered.
func (c *RepositoryStoreCollector) queryMetrics(ctx context.Context) (map[string]int, error) {
+ var virtualStorages, storages []string
+ for virtualStorage, stors := range c.healthChecker.HealthyNodes() {
+ for _, storage := range stors {
+ virtualStorages = append(virtualStorages, virtualStorage)
+ storages = append(storages, storage)
+ }
+ }
+
+ // The query differs slightly depending on whether repository specific primaries are enabled or not.
+ //
+ // For virtual storage scoped primaries, we fetch the primaries from the `shard_primaries.node_name`
+ // column. We then check whether the primary storage is on the latest generation. If not, we add
+ // it to the read-only count.
+ //
+ // When used with repository specific primaries, the repository's primary is taken from the
+ // `repositories.primary` column. To account for possibility of lazy elections, the query does
+ // not immediately count outdated primaries as read-only. It additionally checks whether the primary
+ // node is unhealthy. If it is not, then the primary is counted as read-only since the elector would
+ // not fail over from a healthy primary. If the primary is unhealthy, the elector could fail over to
+ // another node when the primary is needed. The query checks whethere there is a healthy and assigned
+ // in-sync node available. If so, the elector would fail over to it on the next request, thus the
+ // repository would not be in read-only mode.
rows, err := c.db.QueryContext(ctx, `
+WITH healthy_storages AS (
+ SELECT unnest($3::text[]) AS virtual_storage, unnest($4::text[]) AS storage
+)
+
SELECT virtual_storage, COUNT(*)
FROM (
SELECT
@@ -77,9 +109,34 @@ JOIN (
GROUP BY virtual_storage, relative_path
) AS latest_generations USING (virtual_storage, relative_path)
LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
-WHERE latest_generation > COALESCE(generation, -1)
+WHERE (
+ latest_generation > COALESCE(generation, -1)
+ OR ($2 AND (virtual_storage, storage) NOT IN ( SELECT virtual_storage, storage FROM healthy_storages )))
+AND (
+ NOT $2
+ OR (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM healthy_storages )
+ OR NOT EXISTS (
+ SELECT 1
+ FROM storage_repositories AS primary_candidate
+ JOIN healthy_storages USING (virtual_storage, storage)
+ WHERE primary_candidate.virtual_storage = repositories.virtual_storage
+ AND primary_candidate.relative_path = repositories.relative_path
+ AND primary_candidate.generation = latest_generation
+ AND (
+ SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = primary_candidate.storage) = 1
+ FROM repository_assignments
+ WHERE repository_assignments.virtual_storage = primary_candidate.virtual_storage
+ AND repository_assignments.relative_path = primary_candidate.relative_path
+ )
+ )
+)
GROUP BY virtual_storage
- `, pq.StringArray(c.virtualStorages), c.repositoryScoped)
+ `,
+ pq.StringArray(c.virtualStorages),
+ c.repositoryScoped,
+ pq.StringArray(virtualStorages),
+ pq.StringArray(storages),
+ )
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}