diff options
-rw-r--r-- | internal/praefect/datastore/collector.go | 63 | ||||
-rw-r--r-- | internal/praefect/datastore/collector_test.go | 211 |
2 files changed, 270 insertions, 4 deletions
diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go index c5ad39fcd..261fcccea 100644 --- a/internal/praefect/datastore/collector.go +++ b/internal/praefect/datastore/collector.go @@ -17,21 +17,27 @@ var descReadOnlyRepositories = prometheus.NewDesc( nil, ) +type HealthChecker interface { + HealthyNodes() map[string][]string +} + // RepositoryStoreCollector collects metrics from the RepositoryStore. type RepositoryStoreCollector struct { log logrus.FieldLogger db glsql.Querier virtualStorages []string repositoryScoped bool + healthChecker HealthChecker } // NewRepositoryStoreCollector returns a new collector. -func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool) *RepositoryStoreCollector { +func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool, hc HealthChecker) *RepositoryStoreCollector { return &RepositoryStoreCollector{ log: log.WithField("component", "RepositoryStoreCollector"), db: db, virtualStorages: virtualStorages, repositoryScoped: repositoryScoped, + healthChecker: hc, } } @@ -57,7 +63,33 @@ func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) { // query considers repository specific primaries. Otherwise, virtual storage scoped // primaries are considered. func (c *RepositoryStoreCollector) queryMetrics(ctx context.Context) (map[string]int, error) { + var virtualStorages, storages []string + for virtualStorage, stors := range c.healthChecker.HealthyNodes() { + for _, storage := range stors { + virtualStorages = append(virtualStorages, virtualStorage) + storages = append(storages, storage) + } + } + + // The query differs slightly depending on whether repository specific primaries are enabled or not. + // + // For virtual storage scoped primaries, we fetch the primaries from the `shard_primaries.node_name` + // column. We then check whether the primary storage is on the latest generation. If not, we add + // it to the read-only count. + // + // When used with repository specific primaries, the repository's primary is taken from the + // `repositories.primary` column. To account for possibility of lazy elections, the query does + // not immediately count outdated primaries as read-only. It additionally checks whether the primary + // node is unhealthy. If it is not, then the primary is counted as read-only since the elector would + // not fail over from a healthy primary. If the primary is unhealthy, the elector could fail over to + // another node when the primary is needed. The query checks whethere there is a healthy and assigned + // in-sync node available. If so, the elector would fail over to it on the next request, thus the + // repository would not be in read-only mode. rows, err := c.db.QueryContext(ctx, ` +WITH healthy_storages AS ( + SELECT unnest($3::text[]) AS virtual_storage, unnest($4::text[]) AS storage +) + SELECT virtual_storage, COUNT(*) FROM ( SELECT @@ -77,9 +109,34 @@ JOIN ( GROUP BY virtual_storage, relative_path ) AS latest_generations USING (virtual_storage, relative_path) LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) -WHERE latest_generation > COALESCE(generation, -1) +WHERE ( + latest_generation > COALESCE(generation, -1) + OR ($2 AND (virtual_storage, storage) NOT IN ( SELECT virtual_storage, storage FROM healthy_storages ))) +AND ( + NOT $2 + OR (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM healthy_storages ) + OR NOT EXISTS ( + SELECT 1 + FROM storage_repositories AS primary_candidate + JOIN healthy_storages USING (virtual_storage, storage) + WHERE primary_candidate.virtual_storage = repositories.virtual_storage + AND primary_candidate.relative_path = repositories.relative_path + AND primary_candidate.generation = latest_generation + AND ( + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = primary_candidate.storage) = 1 + FROM repository_assignments + WHERE repository_assignments.virtual_storage = primary_candidate.virtual_storage + AND repository_assignments.relative_path = primary_candidate.relative_path + ) + ) +) GROUP BY virtual_storage - `, pq.StringArray(c.virtualStorages), c.repositoryScoped) + `, + pq.StringArray(c.virtualStorages), + c.repositoryScoped, + pq.StringArray(virtualStorages), + pq.StringArray(storages), + ) if err != nil { return nil, fmt.Errorf("query: %w", err) } diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index 3c521dcc4..b09a1ffbd 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -13,6 +13,11 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) +// HealthCheckerFunc is an adapter to turn a conforming function in to a HealthChecker. +type StaticHealthChecker map[string][]string + +func (hc StaticHealthChecker) HealthyNodes() map[string][]string { return hc } + func TestRepositoryStoreCollector(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() @@ -109,7 +114,7 @@ func TestRepositoryStoreCollector(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - c := NewRepositoryStoreCollector(logrus.New(), virtualStorages, db, tc.repositoryScoped) + c := NewRepositoryStoreCollector(logrus.New(), virtualStorages, db, tc.repositoryScoped, nil) require.NoError(t, testutil.CollectAndCompare(c, strings.NewReader(fmt.Sprintf(` # HELP gitaly_praefect_read_only_repositories Number of repositories in read-only mode within a virtual storage. # TYPE gitaly_praefect_read_only_repositories gauge @@ -121,3 +126,207 @@ gitaly_praefect_read_only_repositories{virtual_storage="some-read-only"} %d }) } } + +func TestRepositoryStoreCollector_lazy_failover(t *testing.T) { + for _, tc := range []struct { + desc string + existingGenerations map[string]int + existingAssignments []string + noPrimary bool + healthyNodes StaticHealthChecker + repositoryScopedCount int + virtualStorageScopedCount int + }{ + { + desc: "no records", + }, + { + desc: "outdated secondary", + existingGenerations: map[string]int{ + "primary": 0, + }, + }, + { + desc: "no primary with a candidate", + noPrimary: true, + existingGenerations: map[string]int{ + "secondary": 1, + }, + repositoryScopedCount: 0, + virtualStorageScopedCount: 1, + }, + { + desc: "no primary with no candidate", + noPrimary: true, + healthyNodes: StaticHealthChecker{"virtual-storage": {"primary"}}, + existingGenerations: map[string]int{ + "secondary": 1, + }, + repositoryScopedCount: 1, + virtualStorageScopedCount: 1, + }, + { + desc: "no record unhealthy primary", + healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}}, + existingGenerations: map[string]int{ + "secondary": 0, + }, + repositoryScopedCount: 0, + virtualStorageScopedCount: 1, + }, + { + desc: "no record healthy primary", + existingGenerations: map[string]int{ + "secondary": 0, + }, + repositoryScopedCount: 1, + virtualStorageScopedCount: 1, + }, + { + desc: "outdated unhealthy primary", + healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}}, + existingGenerations: map[string]int{ + "primary": 0, + "secondary": 1, + }, + repositoryScopedCount: 0, + virtualStorageScopedCount: 1, + }, + { + desc: "outdated healthy primary", + existingGenerations: map[string]int{ + "primary": 0, + "secondary": 1, + }, + repositoryScopedCount: 1, + virtualStorageScopedCount: 1, + }, + { + desc: "in-sync unhealthy primary with a candidate", + healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}}, + existingGenerations: map[string]int{ + "primary": 0, + "secondary": 0, + }, + repositoryScopedCount: 0, + virtualStorageScopedCount: 0, + }, + { + desc: "in-sync unhealthy primary without a candidate", + healthyNodes: StaticHealthChecker{"virtual-storage": {}}, + existingGenerations: map[string]int{ + "primary": 0, + "secondary": 0, + }, + repositoryScopedCount: 1, + virtualStorageScopedCount: 0, + }, + { + desc: "secondary is unassigned", + healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}}, + existingGenerations: map[string]int{ + "primary": 1, + "secondary": 1, + }, + existingAssignments: []string{"primary"}, + repositoryScopedCount: 1, + virtualStorageScopedCount: 0, + }, + { + desc: "failsover to assigned", + healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}}, + existingGenerations: map[string]int{ + "primary": 1, + "secondary": 1, + }, + existingAssignments: []string{"primary", "secondary"}, + repositoryScopedCount: 0, + virtualStorageScopedCount: 0, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + staticQueries := []string{ + ` + INSERT INTO repositories (virtual_storage, relative_path) + VALUES ('virtual-storage', 'repository') + `, + } + + if !tc.noPrimary { + staticQueries = append(staticQueries, + ` + UPDATE repositories SET "primary" = 'primary' + `, + ` + INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at) + VALUES ('virtual-storage', 'primary', 'ignored', now()) + `, + ) + } + + for _, q := range staticQueries { + _, err := db.ExecContext(ctx, q) + require.NoError(t, err) + } + + for storage, generation := range tc.existingGenerations { + _, err := db.ExecContext(ctx, ` + INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation) + VALUES ('virtual-storage', 'repository', $1, $2) + `, storage, generation) + require.NoError(t, err) + } + + for _, storage := range tc.existingAssignments { + _, err := db.ExecContext(ctx, ` + INSERT INTO repository_assignments (virtual_storage, relative_path, storage) + VALUES ('virtual-storage', 'repository', $1) + `, storage) + require.NoError(t, err) + } + + for _, scope := range []struct { + desc string + repositoryScoped bool + readOnlyCount int + }{ + { + desc: "virtual storage primaries", + repositoryScoped: false, + readOnlyCount: tc.virtualStorageScopedCount, + }, + { + desc: "repository primaries", + repositoryScoped: true, + readOnlyCount: tc.repositoryScopedCount, + }, + } { + t.Run(scope.desc, func(t *testing.T) { + healthyNodes := StaticHealthChecker{"virtual-storage": {"primary", "secondary"}} + if tc.healthyNodes != nil { + healthyNodes = tc.healthyNodes + } + + c := NewRepositoryStoreCollector( + logrus.New(), + []string{"virtual-storage"}, + db, + scope.repositoryScoped, + healthyNodes, + ) + + require.NoError(t, testutil.CollectAndCompare(c, strings.NewReader(fmt.Sprintf(` +# HELP gitaly_praefect_read_only_repositories Number of repositories in read-only mode within a virtual storage. +# TYPE gitaly_praefect_read_only_repositories gauge +gitaly_praefect_read_only_repositories{virtual_storage="virtual-storage"} %d + `, scope.readOnlyCount)))) + }) + } + }) + } +} |