Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/praefect/datastore/collector.go63
-rw-r--r--internal/praefect/datastore/collector_test.go211
2 files changed, 270 insertions, 4 deletions
diff --git a/internal/praefect/datastore/collector.go b/internal/praefect/datastore/collector.go
index c5ad39fcd..261fcccea 100644
--- a/internal/praefect/datastore/collector.go
+++ b/internal/praefect/datastore/collector.go
@@ -17,21 +17,27 @@ var descReadOnlyRepositories = prometheus.NewDesc(
nil,
)
+type HealthChecker interface {
+ HealthyNodes() map[string][]string
+}
+
// RepositoryStoreCollector collects metrics from the RepositoryStore.
type RepositoryStoreCollector struct {
log logrus.FieldLogger
db glsql.Querier
virtualStorages []string
repositoryScoped bool
+ healthChecker HealthChecker
}
// NewRepositoryStoreCollector returns a new collector.
-func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool) *RepositoryStoreCollector {
+func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool, hc HealthChecker) *RepositoryStoreCollector {
return &RepositoryStoreCollector{
log: log.WithField("component", "RepositoryStoreCollector"),
db: db,
virtualStorages: virtualStorages,
repositoryScoped: repositoryScoped,
+ healthChecker: hc,
}
}
@@ -57,7 +63,33 @@ func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric) {
// query considers repository specific primaries. Otherwise, virtual storage scoped
// primaries are considered.
func (c *RepositoryStoreCollector) queryMetrics(ctx context.Context) (map[string]int, error) {
+ var virtualStorages, storages []string
+ for virtualStorage, stors := range c.healthChecker.HealthyNodes() {
+ for _, storage := range stors {
+ virtualStorages = append(virtualStorages, virtualStorage)
+ storages = append(storages, storage)
+ }
+ }
+
+ // The query differs slightly depending on whether repository specific primaries are enabled or not.
+ //
+ // For virtual storage scoped primaries, we fetch the primaries from the `shard_primaries.node_name`
+ // column. We then check whether the primary storage is on the latest generation. If not, we add
+ // it to the read-only count.
+ //
+ // When used with repository specific primaries, the repository's primary is taken from the
+ // `repositories.primary` column. To account for possibility of lazy elections, the query does
+ // not immediately count outdated primaries as read-only. It additionally checks whether the primary
+ // node is unhealthy. If it is not, then the primary is counted as read-only since the elector would
+ // not fail over from a healthy primary. If the primary is unhealthy, the elector could fail over to
+ // another node when the primary is needed. The query checks whethere there is a healthy and assigned
+ // in-sync node available. If so, the elector would fail over to it on the next request, thus the
+ // repository would not be in read-only mode.
rows, err := c.db.QueryContext(ctx, `
+WITH healthy_storages AS (
+ SELECT unnest($3::text[]) AS virtual_storage, unnest($4::text[]) AS storage
+)
+
SELECT virtual_storage, COUNT(*)
FROM (
SELECT
@@ -77,9 +109,34 @@ JOIN (
GROUP BY virtual_storage, relative_path
) AS latest_generations USING (virtual_storage, relative_path)
LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
-WHERE latest_generation > COALESCE(generation, -1)
+WHERE (
+ latest_generation > COALESCE(generation, -1)
+ OR ($2 AND (virtual_storage, storage) NOT IN ( SELECT virtual_storage, storage FROM healthy_storages )))
+AND (
+ NOT $2
+ OR (virtual_storage, storage) IN ( SELECT virtual_storage, storage FROM healthy_storages )
+ OR NOT EXISTS (
+ SELECT 1
+ FROM storage_repositories AS primary_candidate
+ JOIN healthy_storages USING (virtual_storage, storage)
+ WHERE primary_candidate.virtual_storage = repositories.virtual_storage
+ AND primary_candidate.relative_path = repositories.relative_path
+ AND primary_candidate.generation = latest_generation
+ AND (
+ SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = primary_candidate.storage) = 1
+ FROM repository_assignments
+ WHERE repository_assignments.virtual_storage = primary_candidate.virtual_storage
+ AND repository_assignments.relative_path = primary_candidate.relative_path
+ )
+ )
+)
GROUP BY virtual_storage
- `, pq.StringArray(c.virtualStorages), c.repositoryScoped)
+ `,
+ pq.StringArray(c.virtualStorages),
+ c.repositoryScoped,
+ pq.StringArray(virtualStorages),
+ pq.StringArray(storages),
+ )
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 3c521dcc4..b09a1ffbd 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -13,6 +13,11 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
+// HealthCheckerFunc is an adapter to turn a conforming function in to a HealthChecker.
+type StaticHealthChecker map[string][]string
+
+func (hc StaticHealthChecker) HealthyNodes() map[string][]string { return hc }
+
func TestRepositoryStoreCollector(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
@@ -109,7 +114,7 @@ func TestRepositoryStoreCollector(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- c := NewRepositoryStoreCollector(logrus.New(), virtualStorages, db, tc.repositoryScoped)
+ c := NewRepositoryStoreCollector(logrus.New(), virtualStorages, db, tc.repositoryScoped, nil)
require.NoError(t, testutil.CollectAndCompare(c, strings.NewReader(fmt.Sprintf(`
# HELP gitaly_praefect_read_only_repositories Number of repositories in read-only mode within a virtual storage.
# TYPE gitaly_praefect_read_only_repositories gauge
@@ -121,3 +126,207 @@ gitaly_praefect_read_only_repositories{virtual_storage="some-read-only"} %d
})
}
}
+
+func TestRepositoryStoreCollector_lazy_failover(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ existingGenerations map[string]int
+ existingAssignments []string
+ noPrimary bool
+ healthyNodes StaticHealthChecker
+ repositoryScopedCount int
+ virtualStorageScopedCount int
+ }{
+ {
+ desc: "no records",
+ },
+ {
+ desc: "outdated secondary",
+ existingGenerations: map[string]int{
+ "primary": 0,
+ },
+ },
+ {
+ desc: "no primary with a candidate",
+ noPrimary: true,
+ existingGenerations: map[string]int{
+ "secondary": 1,
+ },
+ repositoryScopedCount: 0,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "no primary with no candidate",
+ noPrimary: true,
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"primary"}},
+ existingGenerations: map[string]int{
+ "secondary": 1,
+ },
+ repositoryScopedCount: 1,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "no record unhealthy primary",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}},
+ existingGenerations: map[string]int{
+ "secondary": 0,
+ },
+ repositoryScopedCount: 0,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "no record healthy primary",
+ existingGenerations: map[string]int{
+ "secondary": 0,
+ },
+ repositoryScopedCount: 1,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "outdated unhealthy primary",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}},
+ existingGenerations: map[string]int{
+ "primary": 0,
+ "secondary": 1,
+ },
+ repositoryScopedCount: 0,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "outdated healthy primary",
+ existingGenerations: map[string]int{
+ "primary": 0,
+ "secondary": 1,
+ },
+ repositoryScopedCount: 1,
+ virtualStorageScopedCount: 1,
+ },
+ {
+ desc: "in-sync unhealthy primary with a candidate",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}},
+ existingGenerations: map[string]int{
+ "primary": 0,
+ "secondary": 0,
+ },
+ repositoryScopedCount: 0,
+ virtualStorageScopedCount: 0,
+ },
+ {
+ desc: "in-sync unhealthy primary without a candidate",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {}},
+ existingGenerations: map[string]int{
+ "primary": 0,
+ "secondary": 0,
+ },
+ repositoryScopedCount: 1,
+ virtualStorageScopedCount: 0,
+ },
+ {
+ desc: "secondary is unassigned",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}},
+ existingGenerations: map[string]int{
+ "primary": 1,
+ "secondary": 1,
+ },
+ existingAssignments: []string{"primary"},
+ repositoryScopedCount: 1,
+ virtualStorageScopedCount: 0,
+ },
+ {
+ desc: "failsover to assigned",
+ healthyNodes: StaticHealthChecker{"virtual-storage": {"secondary"}},
+ existingGenerations: map[string]int{
+ "primary": 1,
+ "secondary": 1,
+ },
+ existingAssignments: []string{"primary", "secondary"},
+ repositoryScopedCount: 0,
+ virtualStorageScopedCount: 0,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ staticQueries := []string{
+ `
+ INSERT INTO repositories (virtual_storage, relative_path)
+ VALUES ('virtual-storage', 'repository')
+ `,
+ }
+
+ if !tc.noPrimary {
+ staticQueries = append(staticQueries,
+ `
+ UPDATE repositories SET "primary" = 'primary'
+ `,
+ `
+ INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at)
+ VALUES ('virtual-storage', 'primary', 'ignored', now())
+ `,
+ )
+ }
+
+ for _, q := range staticQueries {
+ _, err := db.ExecContext(ctx, q)
+ require.NoError(t, err)
+ }
+
+ for storage, generation := range tc.existingGenerations {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation)
+ VALUES ('virtual-storage', 'repository', $1, $2)
+ `, storage, generation)
+ require.NoError(t, err)
+ }
+
+ for _, storage := range tc.existingAssignments {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repository_assignments (virtual_storage, relative_path, storage)
+ VALUES ('virtual-storage', 'repository', $1)
+ `, storage)
+ require.NoError(t, err)
+ }
+
+ for _, scope := range []struct {
+ desc string
+ repositoryScoped bool
+ readOnlyCount int
+ }{
+ {
+ desc: "virtual storage primaries",
+ repositoryScoped: false,
+ readOnlyCount: tc.virtualStorageScopedCount,
+ },
+ {
+ desc: "repository primaries",
+ repositoryScoped: true,
+ readOnlyCount: tc.repositoryScopedCount,
+ },
+ } {
+ t.Run(scope.desc, func(t *testing.T) {
+ healthyNodes := StaticHealthChecker{"virtual-storage": {"primary", "secondary"}}
+ if tc.healthyNodes != nil {
+ healthyNodes = tc.healthyNodes
+ }
+
+ c := NewRepositoryStoreCollector(
+ logrus.New(),
+ []string{"virtual-storage"},
+ db,
+ scope.repositoryScoped,
+ healthyNodes,
+ )
+
+ require.NoError(t, testutil.CollectAndCompare(c, strings.NewReader(fmt.Sprintf(`
+# HELP gitaly_praefect_read_only_repositories Number of repositories in read-only mode within a virtual storage.
+# TYPE gitaly_praefect_read_only_repositories gauge
+gitaly_praefect_read_only_repositories{virtual_storage="virtual-storage"} %d
+ `, scope.readOnlyCount))))
+ })
+ }
+ })
+ }
+}