Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-09-21 17:45:14 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-09-21 20:02:15 +0300
commitd05d1609151e62db12e5db51bc1dc473a61f620b (patch)
tree061f5622ff28cea5808cef0b1601a719cf79b771
parenta15f2b2393b0567f64b708de9aa3c04f4a9b7b33 (diff)
Allow null generations in repositories table
'repositories' table's records are used to keep record of repositories that should exist and as repository specific locks for accessing 'storage_repositories' table. The repository specific locking was implemented by incrementing the generation column before modifying the entries in the 'storage_repositories' table. Furthermore, the generation column is used as an easy way to access the latest generation number. These uses prevent adding other columns to the table, as inserting a record forces one to also set a generation number from unrelated code. To make it viable to add other repository specific columns to the table, this commit stops using the 'repositories.generation' as an easy way to get the current expected generation and instead gets it by getting the maximum value from the 'storage_repositories' table. To make it possible to add other columns without affecting the generation, NOT NULL constraint is dropped from the column and queries have been updated to handle the case. The column is still kept around and updated in the writes to keep backwards compatibility with earlier versions. The column can be dropped later once every repository is guaranteed to have a record in the table by replacing the increment operation with a select for update. Behavior should be semantically the same. Only difference is in GetConsistentSecondaries which was only checking the secondaries match the primary's generation rather than that they are on the latest generation. This was changed as we can't conistency of the repository unless it is on the very latest generation.
-rw-r--r--internal/praefect/coordinator_test.go1
-rw-r--r--internal/praefect/datastore/migrations/20200921154417_repositories_nullable_generation.go13
-rw-r--r--internal/praefect/datastore/repository_memory.go41
-rw-r--r--internal/praefect/datastore/repository_memory_test.go102
-rw-r--r--internal/praefect/datastore/repository_postgres.go99
-rw-r--r--internal/praefect/datastore/repository_postgres_test.go9
6 files changed, 121 insertions, 144 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 61354f6df..01c5603f0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -76,6 +76,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
defer cancel()
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, "latest", 1))
require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, 1))
if tc.readOnly {
require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, 0))
diff --git a/internal/praefect/datastore/migrations/20200921154417_repositories_nullable_generation.go b/internal/praefect/datastore/migrations/20200921154417_repositories_nullable_generation.go
new file mode 100644
index 000000000..08ee6816c
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200921154417_repositories_nullable_generation.go
@@ -0,0 +1,13 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200921154417_repositories_nullable_generation",
+ Up: []string{`ALTER TABLE repositories ALTER COLUMN generation DROP NOT NULL`},
+ Down: []string{},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go
index 0d175fb8f..cb4216f7b 100644
--- a/internal/praefect/datastore/repository_memory.go
+++ b/internal/praefect/datastore/repository_memory.go
@@ -51,9 +51,9 @@ func (s storages) storages(virtualStorage string) ([]string, error) {
return storages, nil
}
-// virtualStorageStates represents the virtual storage's view of what state the repositories should be in.
-// It structured as virtual-storage->relative_path->generation.
-type virtualStorageState map[string]map[string]int
+// virtualStorageStates represents the virtual storage's view of which repositories should exist.
+// It's structured as virtual-storage->relative_path.
+type virtualStorageState map[string]map[string]struct{}
// storageState contains individual storage's repository states.
// It structured as virtual-storage->relative_path->storage->generation.
@@ -144,7 +144,7 @@ func (m *MemoryRepositoryStore) RenameRepository(ctx context.Context, virtualSto
if latestGen != GenerationUnknown {
m.deleteRepository(virtualStorage, relativePath)
- m.setRepositoryGeneration(virtualStorage, newRelativePath, latestGen)
+ m.createRepository(virtualStorage, newRelativePath)
}
if storageGen != GenerationUnknown {
@@ -192,7 +192,7 @@ func (m *MemoryRepositoryStore) GetConsistentSecondaries(ctx context.Context, vi
return nil, err
}
- expectedGen := m.getStorageGeneration(virtualStorage, relativePath, primary)
+ expectedGen := m.getRepositoryGeneration(virtualStorage, relativePath)
if expectedGen == GenerationUnknown {
return nil, nil
}
@@ -237,7 +237,9 @@ func (m *MemoryRepositoryStore) GetOutdatedRepositories(ctx context.Context, vir
return outdatedRepos, nil
}
- for relativePath, latestGeneration := range repositories {
+ for relativePath := range repositories {
+ latestGeneration := m.getRepositoryGeneration(virtualStorage, relativePath)
+
for _, storage := range storages {
if gen := m.getStorageGeneration(virtualStorage, relativePath, storage); gen < latestGeneration {
if outdatedRepos[relativePath] == nil {
@@ -260,7 +262,8 @@ func (m *MemoryRepositoryStore) CountReadOnlyRepositories(ctx context.Context, v
for vs, primary := range vsPrimaries {
vsReadOnly[vs] = 0
relativePaths := m.virtualStorageState[vs]
- for relativePath, expectedGeneration := range relativePaths {
+ for relativePath := range relativePaths {
+ expectedGeneration := m.getRepositoryGeneration(vs, relativePath)
actualGeneration := m.getStorageGeneration(vs, relativePath, primary)
if actualGeneration < expectedGeneration {
vsReadOnly[vs]++
@@ -272,12 +275,19 @@ func (m *MemoryRepositoryStore) CountReadOnlyRepositories(ctx context.Context, v
}
func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int {
- gen, ok := m.virtualStorageState[virtualStorage][relativePath]
+ generations, ok := m.storageState[virtualStorage][relativePath]
if !ok {
return GenerationUnknown
}
- return gen
+ max := GenerationUnknown
+ for _, generation := range generations {
+ if generation > max {
+ max = generation
+ }
+ }
+
+ return max
}
func (m *MemoryRepositoryStore) getStorageGeneration(virtualStorage, relativePath, storage string) int {
@@ -318,21 +328,16 @@ func (m *MemoryRepositoryStore) deleteStorageRepository(virtualStorage, relative
}
func (m *MemoryRepositoryStore) setGeneration(virtualStorage, relativePath, storage string, generation int) {
- m.setRepositoryGeneration(virtualStorage, relativePath, generation)
+ m.createRepository(virtualStorage, relativePath)
m.setStorageGeneration(virtualStorage, relativePath, storage, generation)
}
-func (m *MemoryRepositoryStore) setRepositoryGeneration(virtualStorage, relativePath string, generation int) {
- current := m.getRepositoryGeneration(virtualStorage, relativePath)
- if generation <= current {
- return
- }
-
+func (m *MemoryRepositoryStore) createRepository(virtualStorage, relativePath string) {
if m.virtualStorageState[virtualStorage] == nil {
- m.virtualStorageState[virtualStorage] = make(map[string]int)
+ m.virtualStorageState[virtualStorage] = make(map[string]struct{})
}
- m.virtualStorageState[virtualStorage][relativePath] = generation
+ m.virtualStorageState[virtualStorage][relativePath] = struct{}{}
}
func (m *MemoryRepositoryStore) setStorageGeneration(virtualStorage, relativePath, storage string, generation int) {
diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_memory_test.go
index 160797f4d..161b63d4d 100644
--- a/internal/praefect/datastore/repository_memory_test.go
+++ b/internal/praefect/datastore/repository_memory_test.go
@@ -40,7 +40,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 0,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -61,7 +61,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 1,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -83,7 +83,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 1,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -102,7 +102,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 2,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -127,7 +127,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 1,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -148,7 +148,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 1,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -160,60 +160,6 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
})
-
- t.Run("increments stays monotonic", func(t *testing.T) {
- rs, requireState := newStore(t, nil)
-
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
- requireState(t, ctx,
- virtualStorageState{
- "virtual-storage-1": {
- "repository-1": 1,
- },
- },
- storageState{
- "virtual-storage-1": {
- "repository-1": {
- "storage-1": 0,
- },
- },
- },
- )
-
- require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, stor, nil))
- requireState(t, ctx,
- virtualStorageState{
- "virtual-storage-1": {
- "repository-1": 2,
- },
- },
- storageState{
- "virtual-storage-1": {
- "repository-1": {
- "storage-1": 2,
- },
- },
- },
- )
-
- require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "storage-2", nil))
- requireState(t, ctx,
- virtualStorageState{
- "virtual-storage-1": {
- "repository-1": 3,
- },
- },
- storageState{
- "virtual-storage-1": {
- "repository-1": {
- "storage-1": 2,
- "storage-2": 3,
- },
- },
- },
- )
- })
})
t.Run("GetGeneration", func(t *testing.T) {
@@ -300,14 +246,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"deleted": {
- "deleted": 0,
+ "deleted": struct{}{},
},
"virtual-storage-1": {
- "other-storages-remain": 0,
+ "other-storages-remain": struct{}{},
},
"virtual-storage-2": {
- "deleted-repo": 0,
- "other-repo-remains": 0,
+ "deleted-repo": struct{}{},
+ "other-repo-remains": struct{}{},
},
},
storageState{
@@ -340,7 +286,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-2": {
- "other-repo-remains": 0,
+ "other-repo-remains": struct{}{},
},
},
storageState{
@@ -379,8 +325,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "renamed-all": 0,
- "renamed-some": 0,
+ "renamed-all": struct{}{},
+ "renamed-some": struct{}{},
},
},
storageState{
@@ -402,8 +348,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "renamed-all-new": 0,
- "renamed-some-new": 0,
+ "renamed-all-new": struct{}{},
+ "renamed-some-new": struct{}{},
},
},
storageState{
@@ -440,7 +386,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": 1,
+ "repository-1": struct{}{},
},
},
storageState{
@@ -460,10 +406,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Equal(t, map[string]struct{}{"consistent-secondary": struct{}{}}, secondaries)
})
- t.Run("primary on unknown generation", func(t *testing.T) {
- secondaries, err := rs.GetConsistentSecondaries(ctx, vs, repo, "no-record")
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 0))
+
+ t.Run("outdated primary", func(t *testing.T) {
+ secondaries, err := rs.GetConsistentSecondaries(ctx, vs, repo, "primary")
require.NoError(t, err)
- require.Empty(t, secondaries)
+ require.Equal(t, map[string]struct{}{"consistent-secondary": struct{}{}}, secondaries)
})
})
@@ -604,12 +552,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"some-read-only": {
- "read-only-outdated": 1,
- "read-only-no-record": 0,
- "writable": 0,
+ "read-only-outdated": struct{}{},
+ "read-only-no-record": struct{}{},
+ "writable": struct{}{},
},
"all-writable": {
- "writable": 0,
+ "writable": struct{}{},
},
},
storageState{
diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_postgres.go
index cfceda375..e00974f3b 100644
--- a/internal/praefect/datastore/repository_postgres.go
+++ b/internal/praefect/datastore/repository_postgres.go
@@ -145,7 +145,7 @@ WITH next_generation AS (
generation
) VALUES ($1, $2, 0)
ON CONFLICT (virtual_storage, relative_path) DO
- UPDATE SET generation = repositories.generation + 1
+ UPDATE SET generation = COALESCE(repositories.generation, -1) + 1
RETURNING virtual_storage, relative_path, generation
), base_generation AS (
SELECT virtual_storage, relative_path, generation
@@ -193,7 +193,7 @@ WITH repository AS (
) VALUES ($1, $2, $4)
ON CONFLICT (virtual_storage, relative_path) DO
UPDATE SET generation = EXCLUDED.generation
- WHERE repositories.generation < EXCLUDED.generation
+ WHERE COALESCE(repositories.generation, -1) < EXCLUDED.generation
)
INSERT INTO storage_repositories (
@@ -330,25 +330,34 @@ AND storage = $3
func (rs *PostgresRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) {
const q = `
- WITH storage_gen AS (
- SELECT storage, generation
- FROM storage_repositories
- WHERE virtual_storage = $1
- AND relative_path = $2
- AND storage = ANY($4::text[])
- )
-
- SELECT DISTINCT sec.storage
- FROM (SELECT generation FROM storage_gen WHERE storage = $3) AS prim
- JOIN storage_gen AS sec ON sec.storage != $3 AND prim.generation = sec.generation`
+WITH expected_repositories AS (
+ SELECT virtual_storage, relative_path, unnest($3::text[]) AS storage, MAX(generation) AS generation
+ FROM storage_repositories
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ GROUP BY virtual_storage, relative_path
+)
+
+SELECT storage
+FROM storage_repositories
+JOIN expected_repositories USING (virtual_storage, relative_path, storage, generation)
+`
storages, err := rs.storages.storages(virtualStorage)
if err != nil {
return nil, err
}
- storages = append(storages, primary)
- rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath, primary, pq.StringArray(storages))
+ secondaries := make([]string, 0, len(storages)-1)
+ for _, storage := range storages {
+ if storage == primary {
+ continue
+ }
+
+ secondaries = append(secondaries, storage)
+ }
+
+ rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath, pq.StringArray(secondaries))
if err != nil {
return nil, err
}
@@ -369,14 +378,15 @@ func (rs *PostgresRepositoryStore) GetConsistentSecondaries(ctx context.Context,
func (rs *PostgresRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) {
const q = `
-SELECT COALESCE(r.generation = sr.generation, false)
-FROM repositories AS r
-LEFT JOIN storage_repositories AS sr
- ON sr.virtual_storage = r.virtual_storage
- AND sr.relative_path = r.relative_path
- AND sr.storage = $3
-WHERE r.virtual_storage = $1
-AND r.relative_path = $2
+SELECT COALESCE(expected_repository.generation = storage_repositories.generation, false)
+FROM (
+ SELECT virtual_storage, relative_path, $3 AS storage, MAX(generation) AS generation
+ FROM storage_repositories
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ GROUP BY virtual_storage, relative_path
+) AS expected_repository
+LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
`
var isLatest bool
@@ -417,15 +427,16 @@ func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context,
// As some storages might be missing records from the table, we do a cross join between the repositories and the
// configured storages. If a storage is missing an entry, it is considered fully outdated.
const q = `
-SELECT relative_path, storage, expected.generation - COALESCE(actual.generation, -1) AS behind_by
+SELECT relative_path, storage, expected_repositories.generation - COALESCE(storage_repositories.generation, -1) AS behind_by
FROM (
- SELECT virtual_storage, relative_path, storage, generation
+ SELECT virtual_storage, relative_path, unnest($2::text[]) AS storage, MAX(storage_repositories.generation) AS generation
FROM repositories
- CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS storages
+ JOIN storage_repositories USING (virtual_storage, relative_path)
WHERE virtual_storage = $1
-) AS expected
-LEFT JOIN storage_repositories AS actual USING (virtual_storage, relative_path, storage)
-WHERE COALESCE(actual.generation, -1) < expected.generation
+ GROUP BY virtual_storage, relative_path
+) AS expected_repositories
+LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
+WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation
`
storages, ok := rs.storages[virtualStorage]
if !ok {
@@ -458,21 +469,21 @@ WHERE COALESCE(actual.generation, -1) < expected.generation
func (rs *PostgresRepositoryStore) CountReadOnlyRepositories(ctx context.Context, virtualStoragePrimaries map[string]string) (map[string]int, error) {
const q = `
- WITH primaries AS (
- SELECT
- unnest($1::text[]) AS virtual_storage,
- unnest($2::text[]) AS storage
- ), expected_repositories AS (
- SELECT virtual_storage, relative_path, storage, generation
- FROM repositories
- NATURAL JOIN primaries
- )
-
- SELECT virtual_storage, COUNT(*)
- FROM expected_repositories
- LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
- WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation
- GROUP BY virtual_storage
+SELECT virtual_storage, COUNT(*)
+FROM (
+ SELECT virtual_storage, relative_path, MAX(storage_repositories.generation) AS generation
+ FROM repositories
+ JOIN storage_repositories USING (virtual_storage, relative_path)
+ GROUP BY virtual_storage, relative_path
+) AS expected_repositories
+JOIN (
+ SELECT
+ unnest($1::text[]) AS virtual_storage,
+ unnest($2::text[]) AS storage
+) AS primaries USING (virtual_storage)
+LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
+WHERE COALESCE(storage_repositories.generation, -1) < expected_repositories.generation
+GROUP BY virtual_storage
`
virtualStorages := make([]string, 0, len(virtualStoragePrimaries))
diff --git a/internal/praefect/datastore/repository_postgres_test.go b/internal/praefect/datastore/repository_postgres_test.go
index 4b5642084..76781cbcd 100644
--- a/internal/praefect/datastore/repository_postgres_test.go
+++ b/internal/praefect/datastore/repository_postgres_test.go
@@ -16,7 +16,7 @@ func TestRepositoryStore_Postgres(t *testing.T) {
requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) {
rows, err := db.QueryContext(ctx, `
-SELECT virtual_storage, relative_path, generation
+SELECT virtual_storage, relative_path
FROM repositories
`)
require.NoError(t, err)
@@ -25,13 +25,12 @@ FROM repositories
act := make(virtualStorageState)
for rows.Next() {
var vs, rel string
- var gen int
- require.NoError(t, rows.Scan(&vs, &rel, &gen))
+ require.NoError(t, rows.Scan(&vs, &rel))
if act[vs] == nil {
- act[vs] = make(map[string]int)
+ act[vs] = make(map[string]struct{})
}
- act[vs][rel] = gen
+ act[vs][rel] = struct{}{}
}
require.NoError(t, rows.Err())