Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-12-16 18:11:53 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-12-16 18:11:53 +0300
commit2dbc86a6542d5dc335ade8ecfd2dad39d6bd055f (patch)
tree7e94fb2946e00f3461088a94f15e0c0f927b9faf
parent08eb13cb73031d05578ce27d8db0c80d7427df33 (diff)
parentb34f04265a3d3b811cc5c4e80e2b2bf323bce380 (diff)
Merge branch 'smh-dataloss-primaries' into 'master'
Support repository specific primaries and host assignments in dataloss Closes #3301 and #3199 See merge request gitlab-org/gitaly!2890
-rw-r--r--changelogs/unreleased/smh-dataloss-primaries.yml5
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go262
-rw-r--r--internal/praefect/datastore/repository_store.go146
-rw-r--r--internal/praefect/datastore/repository_store_mock.go28
-rw-r--r--internal/praefect/datastore/repository_store_test.go263
-rw-r--r--internal/praefect/service/info/dataloss.go56
6 files changed, 498 insertions, 262 deletions
diff --git a/changelogs/unreleased/smh-dataloss-primaries.yml b/changelogs/unreleased/smh-dataloss-primaries.yml
new file mode 100644
index 000000000..092a4292d
--- /dev/null
+++ b/changelogs/unreleased/smh-dataloss-primaries.yml
@@ -0,0 +1,5 @@
+---
+title: Support repository specific primaries and host assignments in dataloss
+merge_request: 2890
+author:
+type: changed
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 9a0b8ddbc..14ae43a57 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -4,13 +4,13 @@ package main
import (
"bytes"
+ "fmt"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -28,158 +28,192 @@ func registerPraefectInfoServer(impl gitalypb.PraefectInfoServiceServer) svcRegi
}
func TestDatalossSubcommand(t *testing.T) {
- mgr := &nodes.MockManager{
- GetShardFunc: func(vs string) (nodes.Shard, error) {
- var primary string
- switch vs {
- case "virtual-storage-1":
- primary = "gitaly-1"
- case "virtual-storage-2":
- primary = "gitaly-4"
- default:
- t.Error("unexpected virtual storage")
- }
-
- return nodes.Shard{Primary: &nodes.MockNode{
- GetStorageMethod: func() string {
- return primary
- },
- }}, nil
- },
- }
-
- cfg := config.Config{
- VirtualStorages: []*config.VirtualStorage{
- {
- Name: "virtual-storage-1",
- Nodes: []*config.Node{
- {Storage: "gitaly-1"},
- {Storage: "gitaly-2"},
- {Storage: "gitaly-3"},
- },
+ for _, scope := range []struct {
+ desc string
+ electionStrategy config.ElectionStrategy
+ primaries map[string]string
+ }{
+ {
+ desc: "sql elector",
+ electionStrategy: config.ElectionStrategySQL,
+ primaries: map[string]string{
+ "repository-1": "gitaly-1",
+ "repository-2": "gitaly-1",
},
- {
- Name: "virtual-storage-2",
- Nodes: []*config.Node{
- {Storage: "gitaly-4"},
- },
+ },
+ {
+ desc: "per_repository elector",
+ electionStrategy: config.ElectionStrategyPerRepository,
+ primaries: map[string]string{
+ "repository-1": "gitaly-1",
+ "repository-2": "gitaly-3",
},
},
- }
+ } {
+ t.Run(scope.desc, func(t *testing.T) {
+ cfg := config.Config{
+ Failover: config.Failover{ElectionStrategy: scope.electionStrategy},
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "virtual-storage-1",
+ Nodes: []*config.Node{
+ {Storage: "gitaly-1"},
+ {Storage: "gitaly-2"},
+ {Storage: "gitaly-3"},
+ },
+ },
+ {
+ Name: "virtual-storage-2",
+ Nodes: []*config.Node{
+ {Storage: "gitaly-4"},
+ },
+ },
+ },
+ }
- gs := datastore.NewPostgresRepositoryStore(getDB(t), cfg.StorageNames())
+ db := getDB(t)
+ gs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
- ctx, cancel := testhelper.Context()
- defer cancel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0))
+ for _, q := range []string{
+ `
+ INSERT INTO repositories (virtual_storage, relative_path, "primary")
+ VALUES
+ ('virtual-storage-1', 'repository-1', 'gitaly-1'),
+ ('virtual-storage-1', 'repository-2', 'gitaly-3')
+ `,
+ `
+ INSERT INTO repository_assignments (virtual_storage, relative_path, storage)
+ VALUES
+ ('virtual-storage-1', 'repository-1', 'gitaly-1'),
+ ('virtual-storage-1', 'repository-1', 'gitaly-2'),
+ ('virtual-storage-1', 'repository-2', 'gitaly-1'),
+ ('virtual-storage-1', 'repository-2', 'gitaly-3')
+ `,
+ `
+ INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at)
+ VALUES ('virtual-storage-1', 'gitaly-1', 'ignored', now())
+ `,
+ } {
+ _, err := db.ExecContext(ctx, q)
+ require.NoError(t, err)
+ }
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-2", 0))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-3", 0))
- ln, clean := listenAndServe(t, []svcRegistrar{
- registerPraefectInfoServer(info.NewServer(mgr, cfg, nil, gs, nil))})
- defer clean()
- for _, tc := range []struct {
- desc string
- args []string
- virtualStorages []*config.VirtualStorage
- output string
- error error
- }{
- {
- desc: "positional arguments",
- args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"},
- error: unexpectedPositionalArgsError{Command: "dataloss"},
- },
- {
- desc: "data loss with read-only repositories",
- args: []string{"-virtual-storage=virtual-storage-1"}, output: `Virtual storage: virtual-storage-1
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-2", 1))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
+
+ ln, clean := listenAndServe(t, []svcRegistrar{
+ registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil))})
+ defer clean()
+ for _, tc := range []struct {
+ desc string
+ args []string
+ virtualStorages []*config.VirtualStorage
+ output string
+ error error
+ }{
+ {
+ desc: "positional arguments",
+ args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"},
+ error: unexpectedPositionalArgsError{Command: "dataloss"},
+ },
+ {
+ desc: "data loss with read-only repositories",
+ args: []string{"-virtual-storage=virtual-storage-1"},
+ output: fmt.Sprintf(`Virtual storage: virtual-storage-1
Outdated repositories:
repository-2 (read-only):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
- gitaly-2, assigned host
- gitaly-3, assigned host
+ gitaly-2
Outdated Storages:
- gitaly-1 is behind by 1 change or less, assigned host
-`,
- },
- {
- desc: "data loss with partially replicated repositories",
- args: []string{"-virtual-storage=virtual-storage-1", "-partially-replicated"}, output: `Virtual storage: virtual-storage-1
+ gitaly-1 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less, assigned host
+`, scope.primaries["repository-2"]),
+ },
+ {
+ desc: "data loss with partially replicated repositories",
+ args: []string{"-virtual-storage=virtual-storage-1", "-partially-replicated"},
+ output: fmt.Sprintf(`Virtual storage: virtual-storage-1
Outdated repositories:
repository-1 (writable):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
gitaly-1, assigned host
Outdated Storages:
gitaly-2 is behind by 1 change or less, assigned host
- gitaly-3 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less
repository-2 (read-only):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
- gitaly-2, assigned host
- gitaly-3, assigned host
+ gitaly-2
Outdated Storages:
- gitaly-1 is behind by 1 change or less, assigned host
-`,
- },
- {
- desc: "multiple virtual storages with read-only repositories",
- virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}},
- output: `Virtual storage: virtual-storage-1
+ gitaly-1 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less, assigned host
+`, scope.primaries["repository-1"], scope.primaries["repository-2"]),
+ },
+ {
+ desc: "multiple virtual storages with read-only repositories",
+ virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}},
+ output: fmt.Sprintf(`Virtual storage: virtual-storage-1
Outdated repositories:
repository-2 (read-only):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
- gitaly-2, assigned host
- gitaly-3, assigned host
+ gitaly-2
Outdated Storages:
- gitaly-1 is behind by 1 change or less, assigned host
+ gitaly-1 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less, assigned host
Virtual storage: virtual-storage-2
All repositories are writable!
-`,
- },
- {
- desc: "multiple virtual storages with partially replicated repositories",
- args: []string{"-partially-replicated"},
- virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}},
- output: `Virtual storage: virtual-storage-1
+`, scope.primaries["repository-2"]),
+ },
+ {
+ desc: "multiple virtual storages with partially replicated repositories",
+ args: []string{"-partially-replicated"},
+ virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}},
+ output: fmt.Sprintf(`Virtual storage: virtual-storage-1
Outdated repositories:
repository-1 (writable):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
gitaly-1, assigned host
Outdated Storages:
gitaly-2 is behind by 1 change or less, assigned host
- gitaly-3 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less
repository-2 (read-only):
- Primary: gitaly-1
+ Primary: %s
In-Sync Storages:
- gitaly-2, assigned host
- gitaly-3, assigned host
+ gitaly-2
Outdated Storages:
- gitaly-1 is behind by 1 change or less, assigned host
+ gitaly-1 is behind by 2 changes or less, assigned host
+ gitaly-3 is behind by 1 change or less, assigned host
Virtual storage: virtual-storage-2
All repositories are up to date!
-`,
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- cmd := newDatalossSubcommand()
- output := &bytes.Buffer{}
- cmd.output = output
+`, scope.primaries["repository-1"], scope.primaries["repository-2"]),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ cmd := newDatalossSubcommand()
+ output := &bytes.Buffer{}
+ cmd.output = output
- fs := cmd.FlagSet()
- require.NoError(t, fs.Parse(tc.args))
- err := cmd.Exec(fs, config.Config{
- VirtualStorages: tc.virtualStorages,
- SocketPath: ln.Addr().String(),
- })
- require.Equal(t, tc.error, err, err)
- require.Equal(t, tc.output, output.String())
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse(tc.args))
+ err := cmd.Exec(fs, config.Config{
+ VirtualStorages: tc.virtualStorages,
+ SocketPath: ln.Addr().String(),
+ })
+ require.Equal(t, tc.error, err, err)
+ require.Equal(t, tc.output, output.String())
+ })
+ }
})
}
}
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 1a25ae1bf..288a4655d 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -3,8 +3,10 @@ package datastore
import (
"context"
"database/sql"
+ "encoding/json"
"errors"
"fmt"
+ "strings"
"github.com/lib/pq"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
@@ -90,11 +92,10 @@ type RepositoryStore interface {
IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- // GetOutdatedRepositories finds repositories which are not on the latest generation in the virtual storage. It returns a map
- // with key structure `relative_path-> storage -> generation`, indicating how many changes a storage is missing for a given
- // repository.
- GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
-
+ // GetPartiallyReplicatedRepositories returns information on repositories which have an outdated copy on an assigned storage.
+ // By default, repository specific primaries are returned in the results. If useVirtualStoragePrimaries is set, virtual storage's
+ // primary is returned instead for each repository.
+ GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStoragePrimaries bool) ([]OutdatedRepository, error)
// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
// record of the invalid repository. If the storage was the only storage with the repository, the repository's
// record on the virtual storage is also deleted.
@@ -458,46 +459,127 @@ AND NOT EXISTS (
return err
}
-func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
- // As some storages might be missing records from the table, we do a cross join between the repositories and the
- // configured storages. If a storage is missing an entry, it is considered fully outdated.
- const q = `
-SELECT relative_path, storage, expected_repositories.generation - COALESCE(storage_repositories.generation, -1) AS behind_by
-FROM (
- SELECT virtual_storage, relative_path, unnest($2::text[]) AS storage, MAX(storage_repositories.generation) AS generation
- FROM repositories
- JOIN storage_repositories USING (virtual_storage, relative_path)
- WHERE virtual_storage = $1
- GROUP BY virtual_storage, relative_path
-) AS expected_repositories
-LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
-WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation
-`
- storages, ok := rs.storages[virtualStorage]
+// OutdatedRepositoryStorageDetails represents a storage that contains or should contain a
+// copy of the repository.
+type OutdatedRepositoryStorageDetails struct {
+ // Name of the storage as configured.
+ Name string
+ // BehindBy indicates how many generations the storage's copy of the repository is missing at maximum.
+ BehindBy int
+ // Assigned indicates whether the storage is an assigned host of the repository.
+ Assigned bool
+}
+
+// OutdatedRepository is a repository with one or more outdated assigned storages.
+type OutdatedRepository struct {
+ // RelativePath is the relative path of the repository.
+ RelativePath string
+ // Primary is the current primary of this repository.
+ Primary string
+ // Storages contains information of the repository on each storage that contains the repository
+ // or does not contain the repository but is assigned to host it.
+ Storages []OutdatedRepositoryStorageDetails
+}
+
+func (rs *PostgresRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, useVirtualStoragePrimaries bool) ([]OutdatedRepository, error) {
+ configuredStorages, ok := rs.storages[virtualStorage]
if !ok {
return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
}
- rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages))
+ // The query below gets the generations and assignments of every repository
+ // which has one or more outdated assigned nodes. It works as follows:
+ //
+ // 1. First we get all the storages which contain the repository from `storage_repositories`. We
+ // list every copy of the repository as the latest generation could exist on an unassigned
+ // storage.
+ //
+ // 2. We join `repository_assignments` table with fallback behavior in case the repository has no
+ // assignments. A storage is considered assigned if:
+ //
+ // 1. If the repository has no assignments, every configured storage is considered assigned.
+ // 2. If the repository has assignments, the storage needs to be assigned explicitly.
+ // 3. Assignments of unconfigured storages are treated as if they don't exist.
+ //
+ // If none of the assigned storages are outdated, the repository is not considered outdated as
+ // the desired replication factor has been reached.
+ //
+ // 3. We join `repositories` table to filter out any repositories that have been deleted but still
+ // exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
+ // and there can't be any assignments for deleted repositories, this is still needed as long as the
+ // fallback behavior of no assignments is in place.
+ //
+ // 4. Finally we aggregate each repository's information in to a single row with a JSON object containing
+ // the information. This allows us to group the output already in the query and makes scanning easier
+ // We filter out groups which do not have an outdated assigned storage as the replication factor on those
+ // is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
+ // than the assigned ones.
+ //
+ // If virtual storage scoped primaries are used, the primary is instead selected from the `shard_primaries` table.
+ rows, err := rs.db.QueryContext(ctx, `
+SELECT
+ json_build_object (
+ 'RelativePath', relative_path,
+ 'Primary', "primary",
+ 'Storages', json_agg(
+ json_build_object(
+ 'Name', storage,
+ 'BehindBy', behind_by,
+ 'Assigned', assigned
+ )
+ )
+ )
+FROM (
+ SELECT
+ relative_path,
+ CASE WHEN $3
+ THEN shard_primaries.node_name
+ ELSE repositories."primary"
+ END AS "primary",
+ storage,
+ max(storage_repositories.generation) OVER (PARTITION BY virtual_storage, relative_path) - COALESCE(storage_repositories.generation, -1) AS behind_by,
+ repository_assignments.storage IS NOT NULL AS assigned
+ FROM storage_repositories
+ FULL JOIN (
+ SELECT virtual_storage, relative_path, storage
+ FROM repositories
+ CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS configured_storages
+ WHERE (
+ SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
+ FROM repository_assignments
+ WHERE virtual_storage = repositories.virtual_storage
+ AND relative_path = repositories.relative_path
+ AND storage = ANY($2::text[])
+ )
+ ) AS repository_assignments USING (virtual_storage, relative_path, storage)
+ JOIN repositories USING (virtual_storage, relative_path)
+ LEFT JOIN shard_primaries ON $3 AND shard_name = virtual_storage AND NOT demoted
+ WHERE virtual_storage = $1
+ ORDER BY relative_path, "primary", storage
+) AS outdated_repositories
+GROUP BY relative_path, "primary"
+HAVING max(behind_by) FILTER(WHERE assigned) > 0
+ORDER BY relative_path, "primary"
+ `, virtualStorage, pq.StringArray(configuredStorages), useVirtualStoragePrimaries)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
- outdated := make(map[string]map[string]int)
+ var outdatedRepos []OutdatedRepository
for rows.Next() {
- var storage, relativePath string
- var difference int
- if err := rows.Scan(&relativePath, &storage, &difference); err != nil {
- return nil, err
+ var repositoryJSON string
+ if err := rows.Scan(&repositoryJSON); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
}
- if outdated[relativePath] == nil {
- outdated[relativePath] = make(map[string]int)
+ var outdatedRepo OutdatedRepository
+ if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&outdatedRepo); err != nil {
+ return nil, fmt.Errorf("decode json: %w", err)
}
- outdated[relativePath][storage] = difference
+ outdatedRepos = append(outdatedRepos, outdatedRepo)
}
- return outdated, rows.Err()
+ return outdatedRepos, rows.Err()
}
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 8220d5898..d598a1692 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -5,17 +5,17 @@ import "context"
// MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods
// default to what could be considered success if not set.
type MockRepositoryStore struct {
- GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
- IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
- IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
- GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
- SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
- DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
- RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
- GetOutdatedRepositoriesFunc func(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
- DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
- RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
+ GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
+ IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
+ GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
+ SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
+ GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+ GetPartiallyReplicatedRepositoriesFunc func(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error)
+ DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
}
func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
@@ -82,12 +82,12 @@ func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtu
return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary)
}
-func (m MockRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
- if m.GetOutdatedRepositoriesFunc == nil {
+func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) {
+ if m.GetPartiallyReplicatedRepositoriesFunc == nil {
return nil, nil
}
- return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage)
+ return m.GetPartiallyReplicatedRepositoriesFunc(ctx, virtualStorage, virtualStorageScopedPrimaries)
}
func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index 96f66b728..bd72c4729 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -548,73 +548,210 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.False(t, exists)
})
+}
+
+func TestPostgresRepositoryStore_GetPartiallyReplicatedRepositories(t *testing.T) {
+ for _, scope := range []struct {
+ desc string
+ useVirtualStoragePrimaries bool
+ primary string
+ }{
+ {desc: "virtual storage primaries", useVirtualStoragePrimaries: true, primary: "virtual-storage-primary"},
+ {desc: "repository primaries", useVirtualStoragePrimaries: false, primary: "repository-primary"},
+ } {
+ t.Run(scope.desc, func(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ nonExistentRepository bool
+ existingGenerations map[string]int
+ existingAssignments []string
+ storageDetails []OutdatedRepositoryStorageDetails
+ }{
+ {
+ desc: "all up to date without assignments",
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 0},
+ },
+ {
+ desc: "unconfigured node outdated without assignments",
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 1, "unconfigured": 0},
+ },
+ {
+ desc: "unconfigured node contains the latest",
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 0, "unconfigured": 1},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "node has no repository without assignments",
+ existingGenerations: map[string]int{"primary": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "node has outdated repository without assignments",
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "node with no repository heavily outdated",
+ existingGenerations: map[string]int{"primary": 10},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 11, Assigned: true},
+ },
+ },
+ {
+ desc: "node with a heavily outdated repository",
+ existingGenerations: map[string]int{"primary": 10, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 10, Assigned: true},
+ },
+ },
+ {
+ desc: "outdated nodes ignored when repository should not exist",
+ nonExistentRepository: true,
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ },
+ {
+ desc: "unassigned node has no repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 0},
+ },
+ {
+ desc: "unassigned node has an outdated repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ },
+ {
+ desc: "assigned node has no repository",
+ existingAssignments: []string{"primary", "secondary-1"},
+ existingGenerations: map[string]int{"primary": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "assigned node has outdated repository",
+ existingAssignments: []string{"primary", "secondary-1"},
+ existingGenerations: map[string]int{"primary": 1, "secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 0, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ },
+ },
+ {
+ desc: "unassigned node contains the latest repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"primary": 0, "secondary-1": 1},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "unassigned node contains the only repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"secondary-1": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "unassigned unconfigured node contains the only repository",
+ existingAssignments: []string{"primary"},
+ existingGenerations: map[string]int{"unconfigured": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ {
+ desc: "assigned unconfigured node has no repository",
+ existingAssignments: []string{"primary", "unconfigured"},
+ existingGenerations: map[string]int{"primary": 1},
+ },
+ {
+ desc: "assigned unconfigured node is outdated",
+ existingAssignments: []string{"primary", "unconfigured"},
+ existingGenerations: map[string]int{"primary": 1, "unconfigured": 0},
+ },
+ {
+ desc: "unconfigured node is the only assigned node",
+ existingAssignments: []string{"unconfigured"},
+ existingGenerations: map[string]int{"unconfigured": 0},
+ storageDetails: []OutdatedRepositoryStorageDetails{
+ {Name: "primary", BehindBy: 1, Assigned: true},
+ {Name: "secondary-1", BehindBy: 1, Assigned: true},
+ {Name: "unconfigured", BehindBy: 0, Assigned: false},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
- t.Run("GetOutdatedRepositories", func(t *testing.T) {
- t.Run("unknown virtual storage", func(t *testing.T) {
- rs, _ := newStore(t, map[string][]string{})
+ db := getDB(t)
- _, err := rs.GetOutdatedRepositories(ctx, "does not exist")
- require.EqualError(t, err, `unknown virtual storage: "does not exist"`)
- })
+ configuredStorages := map[string][]string{"virtual-storage": {"primary", "secondary-1"}}
- type state map[string]map[string]map[string]struct {
- generation int
- }
+ if !tc.nonExistentRepository {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repositories (virtual_storage, relative_path, "primary")
+ VALUES ('virtual-storage', 'relative-path', 'repository-primary')
+ `)
+ require.NoError(t, err)
+ }
- type expected map[string]map[string]int
-
- for _, tc := range []struct {
- desc string
- state state
- expected map[string]map[string]int
- }{
- {
- desc: "no records in virtual storage",
- state: state{"virtual-storage-2": {stor: {"repo-1": {generation: 0}}}},
- expected: expected{},
- },
- {
- desc: "storages missing records",
- state: state{vs: {stor: {"repo-1": {generation: 0}}}},
- expected: expected{"repo-1": {"storage-2": 1, "storage-3": 1}},
- },
- {
- desc: "outdated storages",
- state: state{vs: {
- stor: {"repo-1": {generation: 2}},
- "storage-2": {"repo-1": {generation: 1}},
- "storage-3": {"repo-1": {generation: 0}},
- }},
- expected: expected{"repo-1": {"storage-2": 1, "storage-3": 2}},
- },
- {
- desc: "all up to date",
- state: state{vs: {
- stor: {"repo-1": {generation: 3}},
- "storage-2": {"repo-1": {generation: 3}},
- "storage-3": {"repo-1": {generation: 3}},
- }},
- expected: expected{},
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- rs, _ := newStore(t, map[string][]string{vs: {stor, "storage-2", "storage-3"}})
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- for vs, storages := range tc.state {
- for storage, repos := range storages {
- for repo, state := range repos {
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, state.generation))
- }
+ for storage, generation := range tc.existingGenerations {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO storage_repositories VALUES ('virtual-storage', 'relative-path', $1, $2)
+ `, storage, generation)
+ require.NoError(t, err)
}
- }
- outdated, err := rs.GetOutdatedRepositories(ctx, vs)
- require.NoError(t, err)
- require.Equal(t, tc.expected, outdated)
- })
- }
- })
+ for _, storage := range tc.existingAssignments {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1)
+ `, storage)
+ require.NoError(t, err)
+ }
+
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO shard_primaries (shard_name, node_name, elected_by_praefect, elected_at)
+ VALUES ('virtual-storage', 'virtual-storage-primary', 'ignored', now())
+ `)
+ require.NoError(t, err)
+
+ store := NewPostgresRepositoryStore(db, configuredStorages)
+ outdated, err := store.GetPartiallyReplicatedRepositories(ctx, "virtual-storage", scope.useVirtualStoragePrimaries)
+ require.NoError(t, err)
+
+ expected := []OutdatedRepository{
+ {
+ RelativePath: "relative-path",
+ Primary: scope.primary,
+ Storages: tc.storageDetails,
+ },
+ }
+
+ if tc.storageDetails == nil {
+ expected = nil
+ }
+
+ require.Equal(t, expected, outdated)
+ })
+ }
+ })
+ }
}
diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go
index 239133322..122b1d838 100644
--- a/internal/praefect/service/info/dataloss.go
+++ b/internal/praefect/service/info/dataloss.go
@@ -2,69 +2,47 @@ package info
import (
"context"
- "fmt"
- "sort"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- configuredStorages, ok := s.conf.StorageNames()[req.VirtualStorage]
- if !ok {
- return nil, fmt.Errorf("unknown virtual storage: %q", req.VirtualStorage)
- }
-
- shard, err := s.nodeMgr.GetShard(ctx, req.GetVirtualStorage())
- if err != nil {
- return nil, err
- }
-
- outdatedRepos, err := s.rs.GetOutdatedRepositories(ctx, req.GetVirtualStorage())
+ outdatedRepos, err := s.rs.GetPartiallyReplicatedRepositories(
+ ctx, req.GetVirtualStorage(), s.conf.Failover.ElectionStrategy != config.ElectionStrategyPerRepository)
if err != nil {
return nil, err
}
pbRepos := make([]*gitalypb.DatalossCheckResponse_Repository, 0, len(outdatedRepos))
- for relativePath, outdatedStorages := range outdatedRepos {
- readOnly := false
+ for _, outdatedRepo := range outdatedRepos {
+ readOnly := true
- storages := make(map[string]*gitalypb.DatalossCheckResponse_Repository_Storage, len(configuredStorages))
- for _, storage := range configuredStorages {
- storages[storage] = &gitalypb.DatalossCheckResponse_Repository_Storage{
- Name: storage,
- Assigned: true,
+ storages := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(outdatedRepo.Storages))
+ for _, storage := range outdatedRepo.Storages {
+ if storage.Name == outdatedRepo.Primary && storage.BehindBy == 0 {
+ readOnly = false
}
- }
- for name, behindBy := range outdatedStorages {
- if name == shard.Primary.GetStorage() {
- readOnly = true
- }
-
- storages[name].BehindBy = int64(behindBy)
+ storages = append(storages, &gitalypb.DatalossCheckResponse_Repository_Storage{
+ Name: storage.Name,
+ BehindBy: int64(storage.BehindBy),
+ Assigned: storage.Assigned,
+ })
}
if !req.IncludePartiallyReplicated && !readOnly {
continue
}
- storagesSlice := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(storages))
- for _, storage := range storages {
- storagesSlice = append(storagesSlice, storage)
- }
-
- sort.Slice(storagesSlice, func(i, j int) bool { return storagesSlice[i].Name < storagesSlice[j].Name })
-
pbRepos = append(pbRepos, &gitalypb.DatalossCheckResponse_Repository{
- RelativePath: relativePath,
+ RelativePath: outdatedRepo.RelativePath,
+ Primary: outdatedRepo.Primary,
ReadOnly: readOnly,
- Storages: storagesSlice,
- Primary: shard.Primary.GetStorage(),
+ Storages: storages,
})
}
- sort.Slice(pbRepos, func(i, j int) bool { return pbRepos[i].RelativePath < pbRepos[j].RelativePath })
-
return &gitalypb.DatalossCheckResponse{
Repositories: pbRepos,
}, nil