diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-14 17:36:59 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-17 16:30:46 +0300 |
commit | 7bbc7cc429d2dca040328570f676e2e88b45ccd7 (patch) | |
tree | 7c1d00519d26ece0eac008e99cb961613cf50595 | |
parent | fdd1fe70085c1a20b10553680d88a967a4cfbfae (diff) |
repository state tracking stores
Adds an in-memory and a Postgres implemenation of repository
state tracking stores. The two tables added are:
1. `repositories` which contains the expected state of repositories
in the virtual storage.
2. `storage_repositories` which contains the state of the repositories
on the physical storages.
Cross-referencing these two makes it easier to identify outdated
repositories.
This commit implements only the stores without hooking them up to the
rest of the code.
-rw-r--r-- | internal/praefect/config/config.go | 15 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20200707101830_repositories_table.go | 30 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_memory.go | 221 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_memory_test.go | 446 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_postgres.go | 284 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_postgres_test.go | 75 |
8 files changed, 1085 insertions, 0 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 31b1b4d25..4cd10154f 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -172,6 +172,21 @@ func (c *Config) VirtualStorageNames() []string { return names } +// StorageNames returns storage names by virtual storage. +func (c *Config) StorageNames() map[string][]string { + storages := make(map[string][]string, len(c.VirtualStorages)) + for _, vs := range c.VirtualStorages { + nodes := make([]string, len(vs.Nodes)) + for i, n := range vs.Nodes { + nodes[i] = n.Storage + } + + storages[vs.Name] = nodes + } + + return storages +} + // DB holds Postgres client configuration data. type DB struct { Host string `toml:"host"` diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 1ffde42ee..51c2d254c 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -336,6 +336,18 @@ func TestVirtualStorageNames(t *testing.T) { require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames()) } +func TestStorageNames(t *testing.T) { + conf := Config{ + VirtualStorages: []*VirtualStorage{ + {Name: "virtual-storage-1", Nodes: []*Node{{Storage: "gitaly-1"}, {Storage: "gitaly-2"}}}, + {Name: "virtual-storage-2", Nodes: []*Node{{Storage: "gitaly-3"}, {Storage: "gitaly-4"}}}, + }} + require.Equal(t, map[string][]string{ + "virtual-storage-1": {"gitaly-1", "gitaly-2"}, + "virtual-storage-2": {"gitaly-3", "gitaly-4"}, + }, conf.StorageNames()) +} + func TestToPQString(t *testing.T) { testCases := []struct { desc string diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 9de73d7b0..0fb396fe4 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -58,6 +58,8 @@ func (db DB) TruncateAll(t testing.TB) { "replication_queue_lock", "node_status", "shard_primaries", + "storage_repositories", + "repositories", ) } diff --git a/internal/praefect/datastore/migrations/20200707101830_repositories_table.go b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go new file mode 100644 index 000000000..9edd646a1 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go @@ -0,0 +1,30 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200707101830_repositories_table", + Up: []string{` +CREATE TABLE repositories ( + virtual_storage TEXT, + relative_path TEXT, + generation BIGINT NOT NULL, + PRIMARY KEY (virtual_storage, relative_path) +)`, ` +CREATE TABLE storage_repositories ( + virtual_storage TEXT, + relative_path TEXT, + storage TEXT, + generation BIGINT NOT NULL, + PRIMARY KEY (virtual_storage, relative_path, storage) +) +`}, + Down: []string{ + "DROP TABLE storage_repositories", + "DROP TABLE repositories", + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go new file mode 100644 index 000000000..f078bc55b --- /dev/null +++ b/internal/praefect/datastore/repository_memory.go @@ -0,0 +1,221 @@ +package datastore + +import ( + "context" + "sync" +) + +// MemoryRepositoryStore is an in-memory implementation of RepositoryStore. +// Refer to the interface for method documentation. +type MemoryRepositoryStore struct { + m sync.Mutex + + storages map[string][]string + virtualStorageState + storageState +} + +// virtualStorageStates represents the virtual storage's view of what state the repositories should be in. +// It structured as virtual-storage->relative_path->generation. +type virtualStorageState map[string]map[string]int + +// storageState contains individual storage's repository states. +// It structured as virtual-storage->relative_path->storage->generation. +type storageState map[string]map[string]map[string]int + +// NewMemoryRepositoryStore returns an in-memory implementation of RepositoryStore. +func NewMemoryRepositoryStore(storages map[string][]string) *MemoryRepositoryStore { + return &MemoryRepositoryStore{ + storages: storages, + storageState: make(storageState), + virtualStorageState: make(virtualStorageState), + } +} + +func (m *MemoryRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { + m.m.Lock() + defer m.m.Unlock() + + return m.getStorageGeneration(virtualStorage, relativePath, storage), nil +} + +func (m *MemoryRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error) { + m.m.Lock() + defer m.m.Unlock() + + baseGen := m.getRepositoryGeneration(virtualStorage, relativePath) + nextGen := baseGen + 1 + + m.setGeneration(virtualStorage, relativePath, primary, nextGen) + + // If a secondary does not have a generation, it's in an undefined state. We'll only + // pick secondaries on the same generation as the primary to ensure they begin from the + // same starting state. + if baseGen != GenerationUnknown { + for _, secondary := range secondaries { + currentGen := m.getStorageGeneration(virtualStorage, relativePath, secondary) + // If the secondary is not on the same generation as the primary, the secondary + // has failed a concurrent reference transaction. We won't increment its + // generation as it has not applied writes in previous genereations, leaving + // its state undefined. + if currentGen != baseGen { + continue + } + + m.setGeneration(virtualStorage, relativePath, secondary, nextGen) + } + } + + return nextGen, nil +} + +func (m *MemoryRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { + m.m.Lock() + defer m.m.Unlock() + + m.setGeneration(virtualStorage, relativePath, storage, generation) + + return nil +} + +func (m *MemoryRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { + m.m.Lock() + defer m.m.Unlock() + + latestGen := m.getRepositoryGeneration(virtualStorage, relativePath) + storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage) + + m.deleteRepository(virtualStorage, relativePath) + m.deleteStorageRepository(virtualStorage, relativePath, storage) + + if latestGen == GenerationUnknown && storageGen == GenerationUnknown { + return RepositoryNotExistsError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + } + } + + return nil +} + +func (m *MemoryRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { + m.m.Lock() + defer m.m.Unlock() + + latestGen := m.getRepositoryGeneration(virtualStorage, relativePath) + storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage) + + if latestGen != GenerationUnknown { + m.deleteRepository(virtualStorage, relativePath) + m.setRepositoryGeneration(virtualStorage, newRelativePath, latestGen) + } + + if storageGen != GenerationUnknown { + m.deleteStorageRepository(virtualStorage, relativePath, storage) + m.setStorageGeneration(virtualStorage, newRelativePath, storage, storageGen) + } + + if latestGen == GenerationUnknown && storageGen == GenerationUnknown { + return RepositoryNotExistsError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + } + } + + return nil +} + +func (m *MemoryRepositoryStore) EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { + m.m.Lock() + defer m.m.Unlock() + + if current := m.getStorageGeneration(virtualStorage, relativePath, storage); current != GenerationUnknown && current >= generation { + return downgradeAttemptedError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + currentGeneration: current, + attemptedGeneration: generation, + } + } + + return nil +} + +func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int { + gen, ok := m.virtualStorageState[virtualStorage][relativePath] + if !ok { + return GenerationUnknown + } + + return gen +} + +func (m *MemoryRepositoryStore) getStorageGeneration(virtualStorage, relativePath, storage string) int { + gen, ok := m.storageState[virtualStorage][relativePath][storage] + if !ok { + return GenerationUnknown + } + + return gen +} + +func (m *MemoryRepositoryStore) deleteRepository(virtualStorage, relativePath string) { + rels := m.virtualStorageState[virtualStorage] + if rels == nil { + return + } + + delete(rels, relativePath) + if len(rels) == 0 { + delete(m.virtualStorageState, virtualStorage) + } +} + +func (m *MemoryRepositoryStore) deleteStorageRepository(virtualStorage, relativePath, storage string) { + storages := m.storageState[virtualStorage][relativePath] + if storages == nil { + return + } + + delete(storages, storage) + if len(m.storageState[virtualStorage][relativePath]) == 0 { + delete(m.storageState[virtualStorage], relativePath) + } + + if len(m.storageState[virtualStorage]) == 0 { + delete(m.storageState, virtualStorage) + } +} + +func (m *MemoryRepositoryStore) setGeneration(virtualStorage, relativePath, storage string, generation int) { + m.setRepositoryGeneration(virtualStorage, relativePath, generation) + m.setStorageGeneration(virtualStorage, relativePath, storage, generation) +} + +func (m *MemoryRepositoryStore) setRepositoryGeneration(virtualStorage, relativePath string, generation int) { + current := m.getRepositoryGeneration(virtualStorage, relativePath) + if generation <= current { + return + } + + if m.virtualStorageState[virtualStorage] == nil { + m.virtualStorageState[virtualStorage] = make(map[string]int) + } + + m.virtualStorageState[virtualStorage][relativePath] = generation +} + +func (m *MemoryRepositoryStore) setStorageGeneration(virtualStorage, relativePath, storage string, generation int) { + if m.storageState[virtualStorage] == nil { + m.storageState[virtualStorage] = make(map[string]map[string]int) + } + + if m.storageState[virtualStorage][relativePath] == nil { + m.storageState[virtualStorage][relativePath] = make(map[string]int) + } + + m.storageState[virtualStorage][relativePath][storage] = generation +} diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_memory_test.go new file mode 100644 index 000000000..f0c4b4de4 --- /dev/null +++ b/internal/praefect/datastore/repository_memory_test.go @@ -0,0 +1,446 @@ +package datastore + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +type requireState func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) +type repositoryStoreFactory func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) + +func TestRepositoryStore_Memory(t *testing.T) { + testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) { + rs := NewMemoryRepositoryStore(storages) + return rs, func(t *testing.T, _ context.Context, vss virtualStorageState, ss storageState) { + t.Helper() + require.Equal(t, vss, rs.virtualStorageState) + require.Equal(t, ss, rs.storageState) + } + }) +} + +func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { + ctx, cancel := testhelper.Context() + defer cancel() + + const ( + vs = "virtual-storage-1" + repo = "repository-1" + stor = "storage-1" + ) + + t.Run("IncrementGeneration", func(t *testing.T) { + t.Run("creates a new record for primary", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"secondary-1"}) + require.NoError(t, err) + require.Equal(t, 0, generation) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 0, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "primary": 0, + }, + }, + }, + ) + }) + + t.Run("increments existing record for primary", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 0)) + generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"secondary-1"}) + require.NoError(t, err) + require.Equal(t, 1, generation) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "primary": 1, + }, + }, + }, + ) + }) + + t.Run("increments existing for up to date secondary", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 1)) + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "up-to-date-secondary", 1)) + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "outdated-secondary", 0)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "primary": 1, + "up-to-date-secondary": 1, + "outdated-secondary": 0, + }, + }, + }, + ) + + generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{ + "up-to-date-secondary", "outdated-secondary", "non-existing-secondary"}) + require.NoError(t, err) + require.Equal(t, 2, generation) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 2, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "primary": 2, + "up-to-date-secondary": 2, + "outdated-secondary": 0, + }, + }, + }, + ) + }) + }) + + t.Run("SetGeneration", func(t *testing.T) { + t.Run("creates a record", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + err := rs.SetGeneration(ctx, vs, repo, stor, 1) + require.NoError(t, err) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 1, + }, + }, + }, + ) + }) + + t.Run("updates existing record", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 0, + }, + }, + }, + ) + }) + + t.Run("increments stays monotonic", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) + + generation, err := rs.IncrementGeneration(ctx, vs, repo, stor, nil) + require.NoError(t, err) + require.Equal(t, 2, generation) + + generation, err = rs.IncrementGeneration(ctx, vs, repo, "storage-2", nil) + require.NoError(t, err) + require.Equal(t, 3, generation) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 3, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 2, + "storage-2": 3, + }, + }, + }, + ) + }) + }) + + t.Run("GetGeneration", func(t *testing.T) { + rs, _ := newStore(t, nil) + + generation, err := rs.GetGeneration(ctx, vs, repo, stor) + require.NoError(t, err) + require.Equal(t, GenerationUnknown, generation) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) + + generation, err = rs.GetGeneration(ctx, vs, repo, stor) + require.NoError(t, err) + require.Equal(t, 0, generation) + }) + + t.Run("EnsureUpgrade", func(t *testing.T) { + t.Run("no previous record allowed", func(t *testing.T) { + rs, _ := newStore(t, nil) + require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown)) + require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, 0)) + }) + + t.Run("upgrade allowed", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) + require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, 1)) + require.Error(t, + downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown}, + rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 0, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 0, + }, + }, + }, + ) + }) + + t.Run("downgrade prevented", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) + require.Equal(t, + downgradeAttemptedError{vs, repo, stor, 1, 0}, + rs.EnsureUpgrade(ctx, vs, repo, stor, 0)) + require.Error(t, + downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown}, + rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 1, + }, + }, + }, + ) + }) + + t.Run("same version prevented", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) + require.Equal(t, + downgradeAttemptedError{vs, repo, stor, 1, 1}, + rs.EnsureUpgrade(ctx, vs, repo, stor, 1)) + require.Error(t, + downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown}, + rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": 1, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 1, + }, + }, + }, + ) + }) + }) + + t.Run("DeleteRepository", func(t *testing.T) { + t.Run("delete non-existing", func(t *testing.T) { + rs, _ := newStore(t, nil) + + require.Equal(t, + RepositoryNotExistsError{vs, repo, stor}, + rs.DeleteRepository(ctx, vs, repo, stor), + ) + }) + + t.Run("delete existing", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, "deleted", "deleted", "deleted", 0)) + + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "other-storages-remain", "remaining-storage", 0)) + + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-2", "other-repo-remains", "remaining-storage", 0)) + + requireState(t, ctx, + virtualStorageState{ + "deleted": { + "deleted": 0, + }, + "virtual-storage-1": { + "other-storages-remain": 0, + }, + "virtual-storage-2": { + "deleted-repo": 0, + "other-repo-remains": 0, + }, + }, + storageState{ + "deleted": { + "deleted": { + "deleted": 0, + }, + }, + "virtual-storage-1": { + "other-storages-remain": { + "deleted-storage": 0, + "remaining-storage": 0, + }, + }, + "virtual-storage-2": { + "deleted-repo": { + "deleted-storage": 0, + }, + "other-repo-remains": { + "remaining-storage": 0, + }, + }, + }, + ) + + require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", "deleted")) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage")) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage")) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-2": { + "other-repo-remains": 0, + }, + }, + storageState{ + "virtual-storage-1": { + "other-storages-remain": { + "remaining-storage": 0, + }, + }, + "virtual-storage-2": { + "other-repo-remains": { + "remaining-storage": 0, + }, + }, + }, + ) + }) + }) + + t.Run("RenameRepository", func(t *testing.T) { + t.Run("rename non-existing", func(t *testing.T) { + rs, _ := newStore(t, nil) + + require.Equal(t, + RepositoryNotExistsError{vs, repo, stor}, + rs.RenameRepository(ctx, vs, repo, stor, "repository-2"), + ) + }) + + t.Run("rename existing", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-all", "storage-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-some", "storage-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-some", "storage-2", 0)) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "renamed-all": 0, + "renamed-some": 0, + }, + }, + storageState{ + "virtual-storage-1": { + "renamed-all": { + "storage-1": 0, + }, + "renamed-some": { + "storage-1": 0, + "storage-2": 0, + }, + }, + }, + ) + + require.NoError(t, rs.RenameRepository(ctx, vs, "renamed-all", "storage-1", "renamed-all-new")) + require.NoError(t, rs.RenameRepository(ctx, vs, "renamed-some", "storage-1", "renamed-some-new")) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "renamed-all-new": 0, + "renamed-some-new": 0, + }, + }, + storageState{ + "virtual-storage-1": { + "renamed-all-new": { + "storage-1": 0, + }, + "renamed-some-new": { + "storage-1": 0, + }, + "renamed-some": { + "storage-2": 0, + }, + }, + }, + ) + }) + }) +} diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_postgres.go new file mode 100644 index 000000000..433a46776 --- /dev/null +++ b/internal/praefect/datastore/repository_postgres.go @@ -0,0 +1,284 @@ +package datastore + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/lib/pq" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" +) + +// GenerationUnknown is used to indicate lack of generation number in +// a replication job. Older instances can produce replication jobs +// without a generation number. +const GenerationUnknown = -1 + +type downgradeAttemptedError struct { + virtualStorage string + relativePath string + storage string + currentGeneration int + attemptedGeneration int +} + +func (err downgradeAttemptedError) Error() string { + return fmt.Sprintf("attempted downgrading %q -> %q -> %q from generation %d to %d", + err.virtualStorage, err.relativePath, err.storage, err.currentGeneration, err.attemptedGeneration, + ) +} + +// RepositoryNotExistsError is returned when trying to perform an operation on a non-existent repository. +type RepositoryNotExistsError struct { + virtualStorage string + relativePath string + storage string +} + +// Is checks whetehr the other errors is of the same type. +func (err RepositoryNotExistsError) Is(other error) bool { + _, ok := other.(RepositoryNotExistsError) + return ok +} + +// Error returns the errors message. +func (err RepositoryNotExistsError) Error() string { + return fmt.Sprintf("repository %q -> %q -> %q does not exist", + err.virtualStorage, err.relativePath, err.storage, + ) +} + +// RepositoryStore provides access to repository state. +type RepositoryStore interface { + // GetGeneration gets the repository's generation on a given storage. + GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) + // IncrementGeneration increments the primary's and the up to date secondaries' generations. + IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error) + // SetGeneration sets the repository's generation on the given storage. If the generation is higher + // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments. + SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error + // EnsureUpgrade returns an error if the given generation would downgrade the storage's repository. + EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error + // DeleteRepository deletes the repository from the virtual storage and the storage. Returns + // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage + // or the storage. + DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well + // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository + // which has no record in the virtual storage or the storage. + RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error +} + +// PostgresRepositoryStore is a Postgres implementation of RepositoryStore. +// Refer to the interface for method documentation. +type PostgresRepositoryStore struct { + db glsql.Querier + storages map[string][]string +} + +// NewLocalRepositoryStore returns a Postgres implementation of RepositoryStore. +func NewPostgresRepositoryStore(db glsql.Querier, storages map[string][]string) *PostgresRepositoryStore { + return &PostgresRepositoryStore{db: db, storages: storages} +} + +func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { + const q = ` +SELECT generation +FROM storage_repositories +WHERE virtual_storage = $1 +AND relative_path = $2 +AND storage = $3 +` + + var gen int + if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&gen); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return GenerationUnknown, nil + } + + return 0, err + } + + return gen, nil +} + +func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error) { + // The query works as follows: + // 1. `next_generation` CTE increments the latest generation by 1. If no previous records exists, + // the generation starts from 0. + // 2. `base_generation` CTE gets the primary's current generation. A secondary has to be on the primary's + // generation, otherwise its generation won't be incremented. This avoids any issues where a concurrent + // reference transaction has failed and the secondary is no longer up to date when we are incrementing + // the generations. + // 3. `eligible_secondaries` filters out secondaries which participated in a transaction but failed a + /// concurrent transaction. + // 4. `eligible_storages` CTE combines the primary and the up to date secondaries in a list of storages to + // to increment the generation for. + // 5. Finally, we upsert the records in 'storage_repositories' table to match the new generation for the + // eligble storages. + + const q = ` +WITH next_generation AS ( + INSERT INTO repositories ( + virtual_storage, + relative_path, + generation + ) VALUES ($1, $2, 0) + ON CONFLICT (virtual_storage, relative_path) DO + UPDATE SET generation = repositories.generation + 1 + RETURNING virtual_storage, relative_path, generation +), base_generation AS ( + SELECT virtual_storage, relative_path, generation + FROM storage_repositories + WHERE virtual_storage = $1 + AND relative_path = $2 + AND storage = $3 + FOR UPDATE +), eligible_secondaries AS ( + SELECT storage + FROM storage_repositories + NATURAL JOIN base_generation + WHERE storage = ANY($4::text[]) + FOR UPDATE +), eligible_storages AS ( + SELECT storage + FROM eligible_secondaries + UNION + SELECT $3 +) + +INSERT INTO storage_repositories AS sr ( + virtual_storage, + relative_path, + storage, + generation +) +SELECT virtual_storage, relative_path, storage, generation +FROM eligible_storages +CROSS JOIN next_generation +ON CONFLICT (virtual_storage, relative_path, storage) DO + UPDATE SET generation = EXCLUDED.generation +RETURNING generation +` + + var generation int + if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, primary, pq.StringArray(secondaries)).Scan(&generation); err != nil { + return 0, err + } + + return generation, nil +} + +func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { + const q = ` +WITH repository AS ( + INSERT INTO repositories ( + virtual_storage, + relative_path, + generation + ) VALUES ($1, $2, $4) + ON CONFLICT (virtual_storage, relative_path) DO + UPDATE SET generation = EXCLUDED.generation + WHERE repositories.generation < EXCLUDED.generation +) + +INSERT INTO storage_repositories ( + virtual_storage, + relative_path, + storage, + generation +) +VALUES ($1, $2, $3, $4) +ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET + generation = EXCLUDED.generation +` + + _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation) + return err +} + +func (rs *PostgresRepositoryStore) EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { + current, err := rs.GetGeneration(ctx, virtualStorage, relativePath, storage) + if err != nil { + return err + } + + if current != GenerationUnknown && current >= generation { + return downgradeAttemptedError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + currentGeneration: current, + attemptedGeneration: generation, + } + } + + return nil +} + +func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { + const q = ` +WITH repo AS ( + DELETE FROM repositories + WHERE virtual_storage = $1 + AND relative_path = $2 +) + +DELETE FROM storage_repositories +WHERE virtual_storage = $1 +AND relative_path = $2 +AND storage = $3 +` + + result, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage) + if err != nil { + return err + } + + if n, err := result.RowsAffected(); err != nil { + return err + } else if n == 0 { + return RepositoryNotExistsError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + } + } + + return nil +} + +func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { + const q = ` +WITH repo AS ( + UPDATE repositories + SET relative_path = $4 + WHERE virtual_storage = $1 + AND relative_path = $2 +) + +UPDATE storage_repositories +SET relative_path = $4 +WHERE virtual_storage = $1 +AND relative_path = $2 +AND storage = $3 +` + + result, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, newRelativePath) + if err != nil { + return err + } + + if n, err := result.RowsAffected(); err != nil { + return err + } else if n == 0 { + return RepositoryNotExistsError{ + virtualStorage: virtualStorage, + relativePath: relativePath, + storage: storage, + } + } + + return err +} diff --git a/internal/praefect/datastore/repository_postgres_test.go b/internal/praefect/datastore/repository_postgres_test.go new file mode 100644 index 000000000..4b5642084 --- /dev/null +++ b/internal/praefect/datastore/repository_postgres_test.go @@ -0,0 +1,75 @@ +// +build postgres + +package datastore + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRepositoryStore_Postgres(t *testing.T) { + testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) { + db := getDB(t) + gs := NewPostgresRepositoryStore(db, storages) + + requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) { + rows, err := db.QueryContext(ctx, ` +SELECT virtual_storage, relative_path, generation +FROM repositories + `) + require.NoError(t, err) + defer rows.Close() + + act := make(virtualStorageState) + for rows.Next() { + var vs, rel string + var gen int + require.NoError(t, rows.Scan(&vs, &rel, &gen)) + if act[vs] == nil { + act[vs] = make(map[string]int) + } + + act[vs][rel] = gen + } + + require.NoError(t, rows.Err()) + require.Equal(t, exp, act) + } + + requireStorageState := func(t *testing.T, ctx context.Context, exp storageState) { + rows, err := db.QueryContext(ctx, ` +SELECT virtual_storage, relative_path, storage, generation +FROM storage_repositories + `) + require.NoError(t, err) + defer rows.Close() + + act := make(storageState) + for rows.Next() { + var vs, rel, storage string + var gen int + require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen)) + + if act[vs] == nil { + act[vs] = make(map[string]map[string]int) + } + if act[vs][rel] == nil { + act[vs][rel] = make(map[string]int) + } + + act[vs][rel][storage] = gen + } + + require.NoError(t, rows.Err()) + require.Equal(t, exp, act) + } + + return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) { + t.Helper() + requireVirtualStorageState(t, ctx, vss) + requireStorageState(t, ctx, ss) + } + }) +} |