diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-16 18:11:53 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-16 18:11:53 +0300 |
commit | 2dbc86a6542d5dc335ade8ecfd2dad39d6bd055f (patch) | |
tree | 7e94fb2946e00f3461088a94f15e0c0f927b9faf | |
parent | 08eb13cb73031d05578ce27d8db0c80d7427df33 (diff) | |
parent | b34f04265a3d3b811cc5c4e80e2b2bf323bce380 (diff) |
Merge branch 'smh-dataloss-primaries' into 'master'
Support repository specific primaries and host assignments in dataloss
Closes #3301 and #3199
See merge request gitlab-org/gitaly!2890
-rw-r--r-- | changelogs/unreleased/smh-dataloss-primaries.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss_test.go | 262 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 146 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 28 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 263 | ||||
-rw-r--r-- | internal/praefect/service/info/dataloss.go | 56 |
6 files changed, 498 insertions, 262 deletions
diff --git a/changelogs/unreleased/smh-dataloss-primaries.yml b/changelogs/unreleased/smh-dataloss-primaries.yml new file mode 100644 index 000000000..092a4292d --- /dev/null +++ b/changelogs/unreleased/smh-dataloss-primaries.yml @@ -0,0 +1,5 @@ +--- +title: Support repository specific primaries and host assignments in dataloss +merge_request: 2890 +author: +type: changed diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 9a0b8ddbc..14ae43a57 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -4,13 +4,13 @@ package main import ( "bytes" + "fmt" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -28,158 +28,192 @@ func registerPraefectInfoServer(impl gitalypb.PraefectInfoServiceServer) svcRegi } func TestDatalossSubcommand(t *testing.T) { - mgr := &nodes.MockManager{ - GetShardFunc: func(vs string) (nodes.Shard, error) { - var primary string - switch vs { - case "virtual-storage-1": - primary = "gitaly-1" - case "virtual-storage-2": - primary = "gitaly-4" - default: - t.Error("unexpected virtual storage") - } - - return nodes.Shard{Primary: &nodes.MockNode{ - GetStorageMethod: func() string { - return primary - }, - }}, nil - }, - } - - cfg := config.Config{ - VirtualStorages: []*config.VirtualStorage{ - { - Name: "virtual-storage-1", - Nodes: []*config.Node{ - {Storage: "gitaly-1"}, - {Storage: "gitaly-2"}, - {Storage: "gitaly-3"}, - }, + for _, scope := range []struct { + desc string + electionStrategy config.ElectionStrategy + primaries map[string]string + }{ + { + desc: "sql elector", + electionStrategy: config.ElectionStrategySQL, + primaries: map[string]string{ + "repository-1": "gitaly-1", + "repository-2": "gitaly-1", }, - { - Name: "virtual-storage-2", - Nodes: []*config.Node{ - {Storage: "gitaly-4"}, - }, + }, + { + desc: "per_repository elector", + electionStrategy: config.ElectionStrategyPerRepository, + primaries: map[string]string{ + "repository-1": "gitaly-1", + "repository-2": "gitaly-3", }, }, - } + } { + t.Run(scope.desc, func(t *testing.T) { + cfg := config.Config{ + Failover: config.Failover{ElectionStrategy: scope.electionStrategy}, + VirtualStorages: []*config.VirtualStorage{ + { + Name: "virtual-storage-1", + Nodes: []*config.Node{ + {Storage: "gitaly-1"}, + {Storage: "gitaly-2"}, + {Storage: "gitaly-3"}, + }, + }, + { + Name: "virtual-storage-2", + Nodes: []*config.Node{ + {Storage: "gitaly-4"}, + }, + }, + }, + } - gs := datastore.NewPostgresRepositoryStore(getDB(t), cfg.StorageNames()) + db := getDB(t) + gs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) - ctx, cancel := testhelper.Context() - defer cancel() + ctx, cancel := testhelper.Context() + defer cancel() - require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1)) - require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0)) + for _, q := range []string{ + ` + INSERT INTO repositories (virtual_storage, relative_path, "primary") + VALUES + ('virtual-storage-1', 'repository-1', 'gitaly-1'), + ('virtual-storage-1', 'repository-2', 'gitaly-3') + `, + ` + INSERT INTO repository_assignments (virtual_storage, relative_path, storage) + VALUES + ('virtual-storage-1', 'repository-1', 'gitaly-1'), + ('virtual-storage-1', 'repository-1', 'gitaly-2'), + ('virtual-storage-1', 'repository-2', 'gitaly-1'), + ('virtual-storage-1', 'repository-2', 'gitaly-3') + `, + ` + INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at) + VALUES ('virtual-storage-1', 'gitaly-1', 'ignored', now()) + `, + } { + _, err := db.ExecContext(ctx, q) + require.NoError(t, err) + } - require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-2", 0)) - require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-3", 0)) - ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs, nil))}) - defer clean() - for _, tc := range []struct { - desc string - args []string - virtualStorages []*config.VirtualStorage - output string - error error - }{ - { - desc: "positional arguments", - args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"}, - error: unexpectedPositionalArgsError{Command: "dataloss"}, - }, - { - desc: "data loss with read-only repositories", - args: []string{"-virtual-storage=virtual-storage-1"}, output: `Virtual storage: virtual-storage-1 + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-2", 1)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0)) + + ln, clean := listenAndServe(t, []svcRegistrar{ + registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil))}) + defer clean() + for _, tc := range []struct { + desc string + args []string + virtualStorages []*config.VirtualStorage + output string + error error + }{ + { + desc: "positional arguments", + args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"}, + error: unexpectedPositionalArgsError{Command: "dataloss"}, + }, + { + desc: "data loss with read-only repositories", + args: []string{"-virtual-storage=virtual-storage-1"}, + output: fmt.Sprintf(`Virtual storage: virtual-storage-1 Outdated repositories: repository-2 (read-only): - Primary: gitaly-1 + Primary: %s In-Sync Storages: - gitaly-2, assigned host - gitaly-3, assigned host + gitaly-2 Outdated Storages: - gitaly-1 is behind by 1 change or less, assigned host -`, - }, - { - desc: "data loss with partially replicated repositories", - args: []string{"-virtual-storage=virtual-storage-1", "-partially-replicated"}, output: `Virtual storage: virtual-storage-1 + gitaly-1 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less, assigned host +`, scope.primaries["repository-2"]), + }, + { + desc: "data loss with partially replicated repositories", + args: []string{"-virtual-storage=virtual-storage-1", "-partially-replicated"}, + output: fmt.Sprintf(`Virtual storage: virtual-storage-1 Outdated repositories: repository-1 (writable): - Primary: gitaly-1 + Primary: %s In-Sync Storages: gitaly-1, assigned host Outdated Storages: gitaly-2 is behind by 1 change or less, assigned host - gitaly-3 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less repository-2 (read-only): - Primary: gitaly-1 + Primary: %s In-Sync Storages: - gitaly-2, assigned host - gitaly-3, assigned host + gitaly-2 Outdated Storages: - gitaly-1 is behind by 1 change or less, assigned host -`, - }, - { - desc: "multiple virtual storages with read-only repositories", - virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}}, - output: `Virtual storage: virtual-storage-1 + gitaly-1 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less, assigned host +`, scope.primaries["repository-1"], scope.primaries["repository-2"]), + }, + { + desc: "multiple virtual storages with read-only repositories", + virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}}, + output: fmt.Sprintf(`Virtual storage: virtual-storage-1 Outdated repositories: repository-2 (read-only): - Primary: gitaly-1 + Primary: %s In-Sync Storages: - gitaly-2, assigned host - gitaly-3, assigned host + gitaly-2 Outdated Storages: - gitaly-1 is behind by 1 change or less, assigned host + gitaly-1 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less, assigned host Virtual storage: virtual-storage-2 All repositories are writable! -`, - }, - { - desc: "multiple virtual storages with partially replicated repositories", - args: []string{"-partially-replicated"}, - virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}}, - output: `Virtual storage: virtual-storage-1 +`, scope.primaries["repository-2"]), + }, + { + desc: "multiple virtual storages with partially replicated repositories", + args: []string{"-partially-replicated"}, + virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}}, + output: fmt.Sprintf(`Virtual storage: virtual-storage-1 Outdated repositories: repository-1 (writable): - Primary: gitaly-1 + Primary: %s In-Sync Storages: gitaly-1, assigned host Outdated Storages: gitaly-2 is behind by 1 change or less, assigned host - gitaly-3 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less repository-2 (read-only): - Primary: gitaly-1 + Primary: %s In-Sync Storages: - gitaly-2, assigned host - gitaly-3, assigned host + gitaly-2 Outdated Storages: - gitaly-1 is behind by 1 change or less, assigned host + gitaly-1 is behind by 2 changes or less, assigned host + gitaly-3 is behind by 1 change or less, assigned host Virtual storage: virtual-storage-2 All repositories are up to date! -`, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - cmd := newDatalossSubcommand() - output := &bytes.Buffer{} - cmd.output = output +`, scope.primaries["repository-1"], scope.primaries["repository-2"]), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cmd := newDatalossSubcommand() + output := &bytes.Buffer{} + cmd.output = output - fs := cmd.FlagSet() - require.NoError(t, fs.Parse(tc.args)) - err := cmd.Exec(fs, config.Config{ - VirtualStorages: tc.virtualStorages, - SocketPath: ln.Addr().String(), - }) - require.Equal(t, tc.error, err, err) - require.Equal(t, tc.output, output.String()) + fs := cmd.FlagSet() + require.NoError(t, fs.Parse(tc.args)) + err := cmd.Exec(fs, config.Config{ + VirtualStorages: tc.virtualStorages, + SocketPath: ln.Addr().String(), + }) + require.Equal(t, tc.error, err, err) + require.Equal(t, tc.output, output.String()) + }) + } }) } } diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 1a25ae1bf..288a4655d 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -3,8 +3,10 @@ package datastore import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "strings" "github.com/lib/pq" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" @@ -90,11 +92,10 @@ type RepositoryStore interface { IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) - // GetOutdatedRepositories finds repositories which are not on the latest generation in the virtual storage. It returns a map - // with key structure `relative_path-> storage -> generation`, indicating how many changes a storage is missing for a given - // repository. - GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) - + // GetPartiallyReplicatedRepositories returns information on repositories which have an outdated copy on an assigned storage. + // By default, repository specific primaries are returned in the results. If useVirtualStoragePrimaries is set, virtual storage's + // primary is returned instead for each repository. + GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStoragePrimaries bool) ([]OutdatedRepository, error) // DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. @@ -458,46 +459,127 @@ AND NOT EXISTS ( return err } -func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { - // As some storages might be missing records from the table, we do a cross join between the repositories and the - // configured storages. If a storage is missing an entry, it is considered fully outdated. - const q = ` -SELECT relative_path, storage, expected_repositories.generation - COALESCE(storage_repositories.generation, -1) AS behind_by -FROM ( - SELECT virtual_storage, relative_path, unnest($2::text[]) AS storage, MAX(storage_repositories.generation) AS generation - FROM repositories - JOIN storage_repositories USING (virtual_storage, relative_path) - WHERE virtual_storage = $1 - GROUP BY virtual_storage, relative_path -) AS expected_repositories -LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) -WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation -` - storages, ok := rs.storages[virtualStorage] +// OutdatedRepositoryStorageDetails represents a storage that contains or should contain a +// copy of the repository. +type OutdatedRepositoryStorageDetails struct { + // Name of the storage as configured. + Name string + // BehindBy indicates how many generations the storage's copy of the repository is missing at maximum. + BehindBy int + // Assigned indicates whether the storage is an assigned host of the repository. + Assigned bool +} + +// OutdatedRepository is a repository with one or more outdated assigned storages. +type OutdatedRepository struct { + // RelativePath is the relative path of the repository. + RelativePath string + // Primary is the current primary of this repository. + Primary string + // Storages contains information of the repository on each storage that contains the repository + // or does not contain the repository but is assigned to host it. + Storages []OutdatedRepositoryStorageDetails +} + +func (rs *PostgresRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, useVirtualStoragePrimaries bool) ([]OutdatedRepository, error) { + configuredStorages, ok := rs.storages[virtualStorage] if !ok { return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) } - rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages)) + // The query below gets the generations and assignments of every repository + // which has one or more outdated assigned nodes. It works as follows: + // + // 1. First we get all the storages which contain the repository from `storage_repositories`. We + // list every copy of the repository as the latest generation could exist on an unassigned + // storage. + // + // 2. We join `repository_assignments` table with fallback behavior in case the repository has no + // assignments. A storage is considered assigned if: + // + // 1. If the repository has no assignments, every configured storage is considered assigned. + // 2. If the repository has assignments, the storage needs to be assigned explicitly. + // 3. Assignments of unconfigured storages are treated as if they don't exist. + // + // If none of the assigned storages are outdated, the repository is not considered outdated as + // the desired replication factor has been reached. + // + // 3. We join `repositories` table to filter out any repositories that have been deleted but still + // exist on some storages. While the `repository_assignments` has a foreign key on `repositories` + // and there can't be any assignments for deleted repositories, this is still needed as long as the + // fallback behavior of no assignments is in place. + // + // 4. Finally we aggregate each repository's information in to a single row with a JSON object containing + // the information. This allows us to group the output already in the query and makes scanning easier + // We filter out groups which do not have an outdated assigned storage as the replication factor on those + // is reached. Status of unassigned storages does not matter as long as they don't contain a later generation + // than the assigned ones. + // + // If virtual storage scoped primaries are used, the primary is instead selected from the `shard_primaries` table. + rows, err := rs.db.QueryContext(ctx, ` +SELECT + json_build_object ( + 'RelativePath', relative_path, + 'Primary', "primary", + 'Storages', json_agg( + json_build_object( + 'Name', storage, + 'BehindBy', behind_by, + 'Assigned', assigned + ) + ) + ) +FROM ( + SELECT + relative_path, + CASE WHEN $3 + THEN shard_primaries.node_name + ELSE repositories."primary" + END AS "primary", + storage, + max(storage_repositories.generation) OVER (PARTITION BY virtual_storage, relative_path) - COALESCE(storage_repositories.generation, -1) AS behind_by, + repository_assignments.storage IS NOT NULL AS assigned + FROM storage_repositories + FULL JOIN ( + SELECT virtual_storage, relative_path, storage + FROM repositories + CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS configured_storages + WHERE ( + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1 + FROM repository_assignments + WHERE virtual_storage = repositories.virtual_storage + AND relative_path = repositories.relative_path + AND storage = ANY($2::text[]) + ) + ) AS repository_assignments USING (virtual_storage, relative_path, storage) + JOIN repositories USING (virtual_storage, relative_path) + LEFT JOIN shard_primaries ON $3 AND shard_name = virtual_storage AND NOT demoted + WHERE virtual_storage = $1 + ORDER BY relative_path, "primary", storage +) AS outdated_repositories +GROUP BY relative_path, "primary" +HAVING max(behind_by) FILTER(WHERE assigned) > 0 +ORDER BY relative_path, "primary" + `, virtualStorage, pq.StringArray(configuredStorages), useVirtualStoragePrimaries) if err != nil { - return nil, err + return nil, fmt.Errorf("query: %w", err) } defer rows.Close() - outdated := make(map[string]map[string]int) + var outdatedRepos []OutdatedRepository for rows.Next() { - var storage, relativePath string - var difference int - if err := rows.Scan(&relativePath, &storage, &difference); err != nil { - return nil, err + var repositoryJSON string + if err := rows.Scan(&repositoryJSON); err != nil { + return nil, fmt.Errorf("scan: %w", err) } - if outdated[relativePath] == nil { - outdated[relativePath] = make(map[string]int) + var outdatedRepo OutdatedRepository + if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&outdatedRepo); err != nil { + return nil, fmt.Errorf("decode json: %w", err) } - outdated[relativePath][storage] = difference + outdatedRepos = append(outdatedRepos, outdatedRepo) } - return outdated, rows.Err() + return outdatedRepos, rows.Err() } diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 8220d5898..d598a1692 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -5,17 +5,17 @@ import "context" // MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods // default to what could be considered success if not set. type MockRepositoryStore struct { - GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) - IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error - IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) - GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) - SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error - DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error - RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) - GetOutdatedRepositoriesFunc func(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) - DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error - RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) + GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) + IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error + IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) + GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) + SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error + DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error + GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) + GetPartiallyReplicatedRepositoriesFunc func(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) + DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) } func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { @@ -82,12 +82,12 @@ func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtu return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary) } -func (m MockRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { - if m.GetOutdatedRepositoriesFunc == nil { +func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) { + if m.GetPartiallyReplicatedRepositoriesFunc == nil { return nil, nil } - return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage) + return m.GetPartiallyReplicatedRepositoriesFunc(ctx, virtualStorage, virtualStorageScopedPrimaries) } func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 96f66b728..bd72c4729 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -548,73 +548,210 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) }) +} + +func TestPostgresRepositoryStore_GetPartiallyReplicatedRepositories(t *testing.T) { + for _, scope := range []struct { + desc string + useVirtualStoragePrimaries bool + primary string + }{ + {desc: "virtual storage primaries", useVirtualStoragePrimaries: true, primary: "virtual-storage-primary"}, + {desc: "repository primaries", useVirtualStoragePrimaries: false, primary: "repository-primary"}, + } { + t.Run(scope.desc, func(t *testing.T) { + for _, tc := range []struct { + desc string + nonExistentRepository bool + existingGenerations map[string]int + existingAssignments []string + storageDetails []OutdatedRepositoryStorageDetails + }{ + { + desc: "all up to date without assignments", + existingGenerations: map[string]int{"primary": 0, "secondary-1": 0}, + }, + { + desc: "unconfigured node outdated without assignments", + existingGenerations: map[string]int{"primary": 1, "secondary-1": 1, "unconfigured": 0}, + }, + { + desc: "unconfigured node contains the latest", + existingGenerations: map[string]int{"primary": 0, "secondary-1": 0, "unconfigured": 1}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 1, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + {Name: "unconfigured", BehindBy: 0, Assigned: false}, + }, + }, + { + desc: "node has no repository without assignments", + existingGenerations: map[string]int{"primary": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + }, + }, + { + desc: "node has outdated repository without assignments", + existingGenerations: map[string]int{"primary": 1, "secondary-1": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + }, + }, + { + desc: "node with no repository heavily outdated", + existingGenerations: map[string]int{"primary": 10}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 11, Assigned: true}, + }, + }, + { + desc: "node with a heavily outdated repository", + existingGenerations: map[string]int{"primary": 10, "secondary-1": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 10, Assigned: true}, + }, + }, + { + desc: "outdated nodes ignored when repository should not exist", + nonExistentRepository: true, + existingGenerations: map[string]int{"primary": 1, "secondary-1": 0}, + }, + { + desc: "unassigned node has no repository", + existingAssignments: []string{"primary"}, + existingGenerations: map[string]int{"primary": 0}, + }, + { + desc: "unassigned node has an outdated repository", + existingAssignments: []string{"primary"}, + existingGenerations: map[string]int{"primary": 1, "secondary-1": 0}, + }, + { + desc: "assigned node has no repository", + existingAssignments: []string{"primary", "secondary-1"}, + existingGenerations: map[string]int{"primary": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + }, + }, + { + desc: "assigned node has outdated repository", + existingAssignments: []string{"primary", "secondary-1"}, + existingGenerations: map[string]int{"primary": 1, "secondary-1": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 0, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + }, + }, + { + desc: "unassigned node contains the latest repository", + existingAssignments: []string{"primary"}, + existingGenerations: map[string]int{"primary": 0, "secondary-1": 1}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 1, Assigned: true}, + {Name: "secondary-1", BehindBy: 0, Assigned: false}, + }, + }, + { + desc: "unassigned node contains the only repository", + existingAssignments: []string{"primary"}, + existingGenerations: map[string]int{"secondary-1": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 1, Assigned: true}, + {Name: "secondary-1", BehindBy: 0, Assigned: false}, + }, + }, + { + desc: "unassigned unconfigured node contains the only repository", + existingAssignments: []string{"primary"}, + existingGenerations: map[string]int{"unconfigured": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 1, Assigned: true}, + {Name: "unconfigured", BehindBy: 0, Assigned: false}, + }, + }, + { + desc: "assigned unconfigured node has no repository", + existingAssignments: []string{"primary", "unconfigured"}, + existingGenerations: map[string]int{"primary": 1}, + }, + { + desc: "assigned unconfigured node is outdated", + existingAssignments: []string{"primary", "unconfigured"}, + existingGenerations: map[string]int{"primary": 1, "unconfigured": 0}, + }, + { + desc: "unconfigured node is the only assigned node", + existingAssignments: []string{"unconfigured"}, + existingGenerations: map[string]int{"unconfigured": 0}, + storageDetails: []OutdatedRepositoryStorageDetails{ + {Name: "primary", BehindBy: 1, Assigned: true}, + {Name: "secondary-1", BehindBy: 1, Assigned: true}, + {Name: "unconfigured", BehindBy: 0, Assigned: false}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() - t.Run("GetOutdatedRepositories", func(t *testing.T) { - t.Run("unknown virtual storage", func(t *testing.T) { - rs, _ := newStore(t, map[string][]string{}) + db := getDB(t) - _, err := rs.GetOutdatedRepositories(ctx, "does not exist") - require.EqualError(t, err, `unknown virtual storage: "does not exist"`) - }) + configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1"}} - type state map[string]map[string]map[string]struct { - generation int - } + if !tc.nonExistentRepository { + _, err := db.ExecContext(ctx, ` + INSERT INTO repositories (virtual_storage, relative_path, "primary") + VALUES ('virtual-storage', 'relative-path', 'repository-primary') + `) + require.NoError(t, err) + } - type expected map[string]map[string]int - - for _, tc := range []struct { - desc string - state state - expected map[string]map[string]int - }{ - { - desc: "no records in virtual storage", - state: state{"virtual-storage-2": {stor: {"repo-1": {generation: 0}}}}, - expected: expected{}, - }, - { - desc: "storages missing records", - state: state{vs: {stor: {"repo-1": {generation: 0}}}}, - expected: expected{"repo-1": {"storage-2": 1, "storage-3": 1}}, - }, - { - desc: "outdated storages", - state: state{vs: { - stor: {"repo-1": {generation: 2}}, - "storage-2": {"repo-1": {generation: 1}}, - "storage-3": {"repo-1": {generation: 0}}, - }}, - expected: expected{"repo-1": {"storage-2": 1, "storage-3": 2}}, - }, - { - desc: "all up to date", - state: state{vs: { - stor: {"repo-1": {generation: 3}}, - "storage-2": {"repo-1": {generation: 3}}, - "storage-3": {"repo-1": {generation: 3}}, - }}, - expected: expected{}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - rs, _ := newStore(t, map[string][]string{vs: {stor, "storage-2", "storage-3"}}) - - ctx, cancel := testhelper.Context() - defer cancel() - - for vs, storages := range tc.state { - for storage, repos := range storages { - for repo, state := range repos { - require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, state.generation)) - } + for storage, generation := range tc.existingGenerations { + _, err := db.ExecContext(ctx, ` + INSERT INTO storage_repositories VALUES ('virtual-storage', 'relative-path', $1, $2) + `, storage, generation) + require.NoError(t, err) } - } - outdated, err := rs.GetOutdatedRepositories(ctx, vs) - require.NoError(t, err) - require.Equal(t, tc.expected, outdated) - }) - } - }) + for _, storage := range tc.existingAssignments { + _, err := db.ExecContext(ctx, ` + INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1) + `, storage) + require.NoError(t, err) + } + + _, err := db.ExecContext(ctx, ` + INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at) + VALUES ('virtual-storage', 'virtual-storage-primary', 'ignored', now()) + `) + require.NoError(t, err) + + store := NewPostgresRepositoryStore(db, configuredStorages) + outdated, err := store.GetPartiallyReplicatedRepositories(ctx, "virtual-storage", scope.useVirtualStoragePrimaries) + require.NoError(t, err) + + expected := []OutdatedRepository{ + { + RelativePath: "relative-path", + Primary: scope.primary, + Storages: tc.storageDetails, + }, + } + + if tc.storageDetails == nil { + expected = nil + } + + require.Equal(t, expected, outdated) + }) + } + }) + } } diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go index 239133322..122b1d838 100644 --- a/internal/praefect/service/info/dataloss.go +++ b/internal/praefect/service/info/dataloss.go @@ -2,69 +2,47 @@ package info import ( "context" - "fmt" - "sort" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - configuredStorages, ok := s.conf.StorageNames()[req.VirtualStorage] - if !ok { - return nil, fmt.Errorf("unknown virtual storage: %q", req.VirtualStorage) - } - - shard, err := s.nodeMgr.GetShard(ctx, req.GetVirtualStorage()) - if err != nil { - return nil, err - } - - outdatedRepos, err := s.rs.GetOutdatedRepositories(ctx, req.GetVirtualStorage()) + outdatedRepos, err := s.rs.GetPartiallyReplicatedRepositories( + ctx, req.GetVirtualStorage(), s.conf.Failover.ElectionStrategy != config.ElectionStrategyPerRepository) if err != nil { return nil, err } pbRepos := make([]*gitalypb.DatalossCheckResponse_Repository, 0, len(outdatedRepos)) - for relativePath, outdatedStorages := range outdatedRepos { - readOnly := false + for _, outdatedRepo := range outdatedRepos { + readOnly := true - storages := make(map[string]*gitalypb.DatalossCheckResponse_Repository_Storage, len(configuredStorages)) - for _, storage := range configuredStorages { - storages[storage] = &gitalypb.DatalossCheckResponse_Repository_Storage{ - Name: storage, - Assigned: true, + storages := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(outdatedRepo.Storages)) + for _, storage := range outdatedRepo.Storages { + if storage.Name == outdatedRepo.Primary && storage.BehindBy == 0 { + readOnly = false } - } - for name, behindBy := range outdatedStorages { - if name == shard.Primary.GetStorage() { - readOnly = true - } - - storages[name].BehindBy = int64(behindBy) + storages = append(storages, &gitalypb.DatalossCheckResponse_Repository_Storage{ + Name: storage.Name, + BehindBy: int64(storage.BehindBy), + Assigned: storage.Assigned, + }) } if !req.IncludePartiallyReplicated && !readOnly { continue } - storagesSlice := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(storages)) - for _, storage := range storages { - storagesSlice = append(storagesSlice, storage) - } - - sort.Slice(storagesSlice, func(i, j int) bool { return storagesSlice[i].Name < storagesSlice[j].Name }) - pbRepos = append(pbRepos, &gitalypb.DatalossCheckResponse_Repository{ - RelativePath: relativePath, + RelativePath: outdatedRepo.RelativePath, + Primary: outdatedRepo.Primary, ReadOnly: readOnly, - Storages: storagesSlice, - Primary: shard.Primary.GetStorage(), + Storages: storages, }) } - sort.Slice(pbRepos, func(i, j int) bool { return pbRepos[i].RelativePath < pbRepos[j].RelativePath }) - return &gitalypb.DatalossCheckResponse{ Repositories: pbRepos, }, nil |