Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-12-08 13:02:29 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-12-16 17:55:57 +0300
commit68abe53a5e8955452d22f4a75c8b72d5bf86730f (patch)
treea4d6d1601726876f0af1cbeaf66ec6816278fe58
parentbca59804605b4afd0bebacbaa0952c5b4ca16141 (diff)
support repository specific primaries and host assignments in dataloss
Adds support for repository specific primaries and variable replication factor in dataloss. When 'per_repository' elector is in use, the dataloss returns repository specific primaries from the database. Assignments are taken into account if they are set for the repository. If the repository has no assignments, all configured storages are considered assigned as a fallback to Praefect's behavior before assignments were introduced. As before, dataloss by default only returns read-only repositories. When fetching also partially replicated writable repositories, only repositories which have at least one outdated assigned node are printed out. Having an outdated copy of the repository on an unassigned node does not indicate the repository's replication factor has not been met, only that there is an extra copy lying around.
-rw-r--r--changelogs/unreleased/smh-dataloss-primaries.yml5
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go10
-rw-r--r--internal/praefect/datastore/repository_store.go146
-rw-r--r--internal/praefect/datastore/repository_store_mock.go28
-rw-r--r--internal/praefect/datastore/repository_store_test.go263
-rw-r--r--internal/praefect/service/info/dataloss.go56
6 files changed, 359 insertions, 149 deletions
diff --git a/changelogs/unreleased/smh-dataloss-primaries.yml b/changelogs/unreleased/smh-dataloss-primaries.yml
new file mode 100644
index 000000000..092a4292d
--- /dev/null
+++ b/changelogs/unreleased/smh-dataloss-primaries.yml
@@ -0,0 +1,5 @@
+---
+title: Support repository specific primaries and host assignments in dataloss
+merge_request: 2890
+author:
+type: changed
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 9a0b8ddbc..1a41c1766 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -67,11 +67,19 @@ func TestDatalossSubcommand(t *testing.T) {
},
}
- gs := datastore.NewPostgresRepositoryStore(getDB(t), cfg.StorageNames())
+ db := getDB(t)
+
+ gs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
ctx, cancel := testhelper.Context()
defer cancel()
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at)
+ VALUES ('virtual-storage-1', 'gitaly-1', 'ignored', now())
+ `)
+ require.NoError(t, err)
+
require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1))
require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0))
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 1a25ae1bf..288a4655d 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -3,8 +3,10 @@ package datastore
import (
"context"
"database/sql"
+ "encoding/json"
"errors"
"fmt"
+ "strings"
"github.com/lib/pq"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
@@ -90,11 +92,10 @@ type RepositoryStore interface {
IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- // GetOutdatedRepositories finds repositories which are not on the latest generation in the virtual storage. It returns a map
- // with key structure `relative_path-> storage -> generation`, indicating how many changes a storage is missing for a given
- // repository.
- GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
-
+ // GetPartiallyReplicatedRepositories returns information on repositories which have an outdated copy on an assigned storage.
+ // By default, repository specific primaries are returned in the results. If useVirtualStoragePrimaries is set, virtual storage's
+ // primary is returned instead for each repository.
+ GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStoragePrimaries bool) ([]OutdatedRepository, error)
// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
// record of the invalid repository. If the storage was the only storage with the repository, the repository's
// record on the virtual storage is also deleted.
@@ -458,46 +459,127 @@ AND NOT EXISTS (
return err
}
-func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
- // As some storages might be missing records from the table, we do a cross join between the repositories and the
- // configured storages. If a storage is missing an entry, it is considered fully outdated.
- const q = `
-SELECT relative_path, storage, expected_repositories.generation - COALESCE(storage_repositories.generation, -1) AS behind_by
-FROM (
- SELECT virtual_storage, relative_path, unnest($2::text[]) AS storage, MAX(storage_repositories.generation) AS generation
- FROM repositories
- JOIN storage_repositories USING (virtual_storage, relative_path)
- WHERE virtual_storage = $1
- GROUP BY virtual_storage, relative_path
-) AS expected_repositories
-LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
-WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation
-`
- storages, ok := rs.storages[virtualStorage]
+// OutdatedRepositoryStorageDetails represents a storage that contains or should contain a
+// copy of the repository.
+type OutdatedRepositoryStorageDetails struct {
+ // Name of the storage as configured.
+ Name string
+ // BehindBy indicates how many generations the storage's copy of the repository is missing at maximum.
+ BehindBy int
+ // Assigned indicates whether the storage is an assigned host of the repository.
+ Assigned bool
+}
+
+// OutdatedRepository is a repository with one or more outdated assigned storages.
+type OutdatedRepository struct {
+ // RelativePath is the relative path of the repository.
+ RelativePath string
+ // Primary is the current primary of this repository.
+ Primary string
+ // Storages contains information of the repository on each storage that contains the repository
+ // or does not contain the repository but is assigned to host it.
+ Storages []OutdatedRepositoryStorageDetails
+}
+
+func (rs *PostgresRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, useVirtualStoragePrimaries bool) ([]OutdatedRepository, error) {
+ configuredStorages, ok := rs.storages[virtualStorage]
if !ok {
return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
}
- rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages))
+ // The query below gets the generations and assignments of every repository
+ // which has one or more outdated assigned nodes. It works as follows:
+ //
+ // 1. First we get all the storages which contain the repository from `storage_repositories`. We
+ // list every copy of the repository as the latest generation could exist on an unassigned
+ // storage.
+ //
+ // 2. We join `repository_assignments` table with fallback behavior in case the repository has no
+ // assignments. A storage is considered assigned if:
+ //
+ // 1. If the repository has no assignments, every configured storage is considered assigned.
+ // 2. If the repository has assignments, the storage needs to be assigned explicitly.
+ // 3. Assignments of unconfigured storages are treated as if they don't exist.
+ //
+ // If none of the assigned storages are outdated, the repository is not considered outdated as
+ // the desired replication factor has been reached.
+ //
+ // 3. We join `repositories` table to filter out any repositories that have been deleted but still
+ // exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
+ // and there can't be any assignments for deleted repositories, this is still needed as long as the
+ // fallback behavior of no assignments is in place.
+ //
+ // 4. Finally we aggregate each repository's information in to a single row with a JSON object containing
+ // the information. This allows us to group the output already in the query and makes scanning easier
+ // We filter out groups which do not have an outdated assigned storage as the replication factor on those
+ // is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
+ // than the assigned ones.
+ //
+ // If virtual storage scoped primaries are used, the primary is instead selected from the `shard_primaries` table.
+ rows, err := rs.db.QueryContext(ctx, `
+SELECT
+ json_build_object (
+ 'RelativePath', relative_path,
+ 'Primary', "primary",
+ 'Storages', json_agg(
+ json_build_object(
+ 'Name', storage,
+ 'BehindBy', behind_by,
+ 'Assigned', assigned
+ )
+ )
+ )
+FROM (
+ SELECT
+ relative_path,
+ CASE WHEN $3
+ THEN shard_primaries.node_name
+ ELSE repositories."primary"
+ END AS "primary",
+ storage,
+ max(storage_repositories.generation) OVER (PARTITION BY virtual_storage, relative_path) - COALESCE(storage_repositories.generation, -1) AS behind_by,
+ repository_assignments.storage IS NOT NULL AS assigned
+ FROM storage_repositories
+ FULL JOIN (
+ SELECT virtual_storage, relative_path, storage
+ FROM repositories
+ CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS configured_storages
+ WHERE (
+ SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
+ FROM repository_assignments
+ WHERE virtual_storage = repositories.virtual_storage
+ AND relative_path = repositories.relative_path
+ AND storage = ANY($2::text[])
+ )
+ ) AS repository_assignments USING (virtual_storage, relative_path, storage)
+ JOIN repositories USING (virtual_storage, relative_path)
+ LEFT JOIN shard_primaries ON $3 AND shard_name = virtual_storage AND NOT demoted
+ WHERE virtual_storage = $1
+ ORDER BY relative_path, "primary", storage
+) AS outdated_repositories
+GROUP BY relative_path, "primary"
+HAVING max(behind_by) FILTER(WHERE assigned) > 0
+ORDER BY relative_path, "primary"
+ `, virtualStorage, pq.StringArray(configuredStorages), useVirtualStoragePrimaries)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
- outdated := make(map[string]map[string]int)
+ var outdatedRepos []OutdatedRepository
for rows.Next() {
- var storage, relativePath string
- var difference int
- if err := rows.Scan(&relativePath, &storage, &difference); err != nil {
- return nil, err
+ var repositoryJSON string
+ if err := rows.Scan(&repositoryJSON); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
}
- if outdated[relativePath] == nil {
- outdated[relativePath] = make(map[string]int)
+ var outdatedRepo OutdatedRepository
+ if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&outdatedRepo); err != nil {
+ return nil, fmt.Errorf("decode json: %w", err)
}
- outdated[relativePath][storage] = difference
+ outdatedRepos = append(outdatedRepos, outdatedRepo)
}
- return outdated, rows.Err()
+ return outdatedRepos, rows.Err()
}
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 8220d5898..d598a1692 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -5,17 +5,17 @@ import "context"
// MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods
// default to what could be considered success if not set.
type MockRepositoryStore struct {
- GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
- IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
- IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
- GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
- SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
- DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
- RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
- GetOutdatedRepositoriesFunc func(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
- DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
- RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
+ GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
+ IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
+ GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
+ SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
+ GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+ GetPartiallyReplicatedRepositoriesFunc func(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error)
+ DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
}
func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
@@ -82,12 +82,12 @@ func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtu
return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary)
}
-func (m MockRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
- if m.GetOutdatedRepositoriesFunc == nil {
+func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) {
+ if m.GetPartiallyReplicatedRepositoriesFunc == nil {
return nil, nil
}
- return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage)
+ return m.GetPartiallyReplicatedRepositoriesFunc(ctx, virtualStorage, virtualStorageScopedPrimaries)
}
func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index 96f66b728..bd72c4729 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -548,73 +548,210 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.False(t, exists)
})
+}
+
+func TestPostgresRepositoryStore_GetPartiallyReplicatedRepositories(t *testing.T) {
+ for _, scope := range []struct {
+ desc string
+ useVirtualStoragePrimaries bool
+ primary string
+ }{
+ {desc: "virtual storage primaries", useVirtualStoragePrimaries: true, primary: "virtual-storage-primary"},
+ {desc: "repository primaries", useVirtualStoragePrimaries: false, primary: "repository-primary"},
+ } {
+ t.Run(scope.desc, func(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ nonExistentRepository bool
+ existingGenerations map[string]int
+ existingAssignments []string
+ storageDetails []OutdatedRepositoryStorageDetails
+ }{
+ {
+ desc: "all up to date without assignments",
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 0},
+ },
+ {
+ desc: "unconfigured node outdated without assignments",
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 1, "unconfigured": 0},
+ },
+ {
+ desc: "unconfigured node contains the latest",
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 0, "unconfigured": 1},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "node has no repository without assignments",
+ existingGenerations: map[string]int{"primary": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "node has outdated repository without assignments",
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "node with no repository heavily outdated",
+ existingGenerations: map[string]int{"primary": 10},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 11, Assigned: true},
+ },
+ },
+ {
+ desc: "node with a heavily outdated repository",
+ existingGenerations: map[string]int{"primary": 10, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 10, Assigned: true},
+ },
+ },
+ {
+ desc: "outdated nodes ignored when repository should not exist",
+ nonExistentRepository: true,
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ },
+ {
+ desc: "unassigned node has no repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 0},
+ },
+ {
+ desc: "unassigned node has an outdated repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ },
+ {
+ desc: "assigned node has no repository",
+ existingAssignments: []string{"primary", "secondary-1"},
+ existingGenerations: map[string]int{"primary": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "assigned node has outdated repository",
+ existingAssignments: []string{"primary", "secondary-1"},
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "unassigned node contains the latest repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 1},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "unassigned node contains the only repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "unassigned unconfigured node contains the only repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"unconfigured": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "assigned unconfigured node has no repository",
+ existingAssignments: []string{"primary", "unconfigured"},
+ existingGenerations: map[string]int{"primary": 1},
+ },
+ {
+ desc: "assigned unconfigured node is outdated",
+ existingAssignments: []string{"primary", "unconfigured"},
+ existingGenerations: map[string]int{"primary": 1, "unconfigured": 0},
+ },
+ {
+ desc: "unconfigured node is the only assigned node",
+ existingAssignments: []string{"unconfigured"},
+ existingGenerations: map[string]int{"unconfigured": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
- t.Run("GetOutdatedRepositories", func(t *testing.T) {
- t.Run("unknown virtual storage", func(t *testing.T) {
- rs, _ := newStore(t, map[string][]string{})
+ db := getDB(t)
- _, err := rs.GetOutdatedRepositories(ctx, "does not exist")
- require.EqualError(t, err, `unknown virtual storage: "does not exist"`)
- })
+ configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1"}}
- type state map[string]map[string]map[string]struct {
- generation int
- }
+ if !tc.nonExistentRepository {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repositories (virtual_storage, relative_path, "primary")
+ VALUES ('virtual-storage', 'relative-path', 'repository-primary')
+ `)
+ require.NoError(t, err)
+ }
- type expected map[string]map[string]int
-
- for _, tc := range []struct {
- desc string
- state state
- expected map[string]map[string]int
- }{
- {
- desc: "no records in virtual storage",
- state: state{"virtual-storage-2": {stor: {"repo-1": {generation: 0}}}},
- expected: expected{},
- },
- {
- desc: "storages missing records",
- state: state{vs: {stor: {"repo-1": {generation: 0}}}},
- expected: expected{"repo-1": {"storage-2": 1, "storage-3": 1}},
- },
- {
- desc: "outdated storages",
- state: state{vs: {
- stor: {"repo-1": {generation: 2}},
- "storage-2": {"repo-1": {generation: 1}},
- "storage-3": {"repo-1": {generation: 0}},
- }},
- expected: expected{"repo-1": {"storage-2": 1, "storage-3": 2}},
- },
- {
- desc: "all up to date",
- state: state{vs: {
- stor: {"repo-1": {generation: 3}},
- "storage-2": {"repo-1": {generation: 3}},
- "storage-3": {"repo-1": {generation: 3}},
- }},
- expected: expected{},
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- rs, _ := newStore(t, map[string][]string{vs: {stor, "storage-2", "storage-3"}})
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- for vs, storages := range tc.state {
- for storage, repos := range storages {
- for repo, state := range repos {
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, state.generation))
- }
+ for storage, generation := range tc.existingGenerations {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO storage_repositories VALUES ('virtual-storage', 'relative-path', $1, $2)
+ `, storage, generation)
+ require.NoError(t, err)
}
- }
- outdated, err := rs.GetOutdatedRepositories(ctx, vs)
- require.NoError(t, err)
- require.Equal(t, tc.expected, outdated)
- })
- }
- })
+ for _, storage := range tc.existingAssignments {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1)
+ `, storage)
+ require.NoError(t, err)
+ }
+
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at)
+ VALUES ('virtual-storage', 'virtual-storage-primary', 'ignored', now())
+ `)
+ require.NoError(t, err)
+
+ store := NewPostgresRepositoryStore(db, configuredStorages)
+ outdated, err := store.GetPartiallyReplicatedRepositories(ctx, "virtual-storage", scope.useVirtualStoragePrimaries)
+ require.NoError(t, err)
+
+ expected := []OutdatedRepository{
+ {
+ RelativePath: "relative-path",
+ Primary: scope.primary,
+ Storages: tc.storageDetails,
+ },
+ }
+
+ if tc.storageDetails == nil {
+ expected = nil
+ }
+
+ require.Equal(t, expected, outdated)
+ })
+ }
+ })
+ }
}
diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go
index 239133322..122b1d838 100644
--- a/internal/praefect/service/info/dataloss.go
+++ b/internal/praefect/service/info/dataloss.go
@@ -2,69 +2,47 @@ package info
import (
"context"
- "fmt"
- "sort"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- configuredStorages, ok := s.conf.StorageNames()[req.VirtualStorage]
- if !ok {
- return nil, fmt.Errorf("unknown virtual storage: %q", req.VirtualStorage)
- }
-
- shard, err := s.nodeMgr.GetShard(ctx, req.GetVirtualStorage())
- if err != nil {
- return nil, err
- }
-
- outdatedRepos, err := s.rs.GetOutdatedRepositories(ctx, req.GetVirtualStorage())
+ outdatedRepos, err := s.rs.GetPartiallyReplicatedRepositories(
+ ctx, req.GetVirtualStorage(), s.conf.Failover.ElectionStrategy != config.ElectionStrategyPerRepository)
if err != nil {
return nil, err
}
pbRepos := make([]*gitalypb.DatalossCheckResponse_Repository, 0, len(outdatedRepos))
- for relativePath, outdatedStorages := range outdatedRepos {
- readOnly := false
+ for _, outdatedRepo := range outdatedRepos {
+ readOnly := true
- storages := make(map[string]*gitalypb.DatalossCheckResponse_Repository_Storage, len(configuredStorages))
- for _, storage := range configuredStorages {
- storages[storage] = &gitalypb.DatalossCheckResponse_Repository_Storage{
- Name: storage,
- Assigned: true,
+ storages := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(outdatedRepo.Storages))
+ for _, storage := range outdatedRepo.Storages {
+ if storage.Name == outdatedRepo.Primary && storage.BehindBy == 0 {
+ readOnly = false
}
- }
- for name, behindBy := range outdatedStorages {
- if name == shard.Primary.GetStorage() {
- readOnly = true
- }
-
- storages[name].BehindBy = int64(behindBy)
+ storages = append(storages, &gitalypb.DatalossCheckResponse_Repository_Storage{
+ Name: storage.Name,
+ BehindBy: int64(storage.BehindBy),
+ Assigned: storage.Assigned,
+ })
}
if !req.IncludePartiallyReplicated && !readOnly {
continue
}
- storagesSlice := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(storages))
- for _, storage := range storages {
- storagesSlice = append(storagesSlice, storage)
- }
-
- sort.Slice(storagesSlice, func(i, j int) bool { return storagesSlice[i].Name < storagesSlice[j].Name })
-
pbRepos = append(pbRepos, &gitalypb.DatalossCheckResponse_Repository{
- RelativePath: relativePath,
+ RelativePath: outdatedRepo.RelativePath,
+ Primary: outdatedRepo.Primary,
ReadOnly: readOnly,
- Storages: storagesSlice,
- Primary: shard.Primary.GetStorage(),
+ Storages: storages,
})
}
- sort.Slice(pbRepos, func(i, j int) bool { return pbRepos[i].RelativePath < pbRepos[j].RelativePath })
-
return &gitalypb.DatalossCheckResponse{
Repositories: pbRepos,
}, nil