Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-14 17:36:59 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-17 16:30:46 +0300
commit7bbc7cc429d2dca040328570f676e2e88b45ccd7 (patch)
tree7c1d00519d26ece0eac008e99cb961613cf50595
parentfdd1fe70085c1a20b10553680d88a967a4cfbfae (diff)
repository state tracking stores
Adds an in-memory and a Postgres implemenation of repository state tracking stores. The two tables added are: 1. `repositories` which contains the expected state of repositories in the virtual storage. 2. `storage_repositories` which contains the state of the repositories on the physical storages. Cross-referencing these two makes it easier to identify outdated repositories. This commit implements only the stores without hooking them up to the rest of the code.
-rw-r--r--internal/praefect/config/config.go15
-rw-r--r--internal/praefect/config/config_test.go12
-rw-r--r--internal/praefect/datastore/glsql/testing.go2
-rw-r--r--internal/praefect/datastore/migrations/20200707101830_repositories_table.go30
-rw-r--r--internal/praefect/datastore/repository_memory.go221
-rw-r--r--internal/praefect/datastore/repository_memory_test.go446
-rw-r--r--internal/praefect/datastore/repository_postgres.go284
-rw-r--r--internal/praefect/datastore/repository_postgres_test.go75
8 files changed, 1085 insertions, 0 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 31b1b4d25..4cd10154f 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -172,6 +172,21 @@ func (c *Config) VirtualStorageNames() []string {
return names
}
+// StorageNames returns storage names by virtual storage.
+func (c *Config) StorageNames() map[string][]string {
+ storages := make(map[string][]string, len(c.VirtualStorages))
+ for _, vs := range c.VirtualStorages {
+ nodes := make([]string, len(vs.Nodes))
+ for i, n := range vs.Nodes {
+ nodes[i] = n.Storage
+ }
+
+ storages[vs.Name] = nodes
+ }
+
+ return storages
+}
+
// DB holds Postgres client configuration data.
type DB struct {
Host string `toml:"host"`
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 1ffde42ee..51c2d254c 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -336,6 +336,18 @@ func TestVirtualStorageNames(t *testing.T) {
require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames())
}
+func TestStorageNames(t *testing.T) {
+ conf := Config{
+ VirtualStorages: []*VirtualStorage{
+ {Name: "virtual-storage-1", Nodes: []*Node{{Storage: "gitaly-1"}, {Storage: "gitaly-2"}}},
+ {Name: "virtual-storage-2", Nodes: []*Node{{Storage: "gitaly-3"}, {Storage: "gitaly-4"}}},
+ }}
+ require.Equal(t, map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2"},
+ "virtual-storage-2": {"gitaly-3", "gitaly-4"},
+ }, conf.StorageNames())
+}
+
func TestToPQString(t *testing.T) {
testCases := []struct {
desc string
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 9de73d7b0..0fb396fe4 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -58,6 +58,8 @@ func (db DB) TruncateAll(t testing.TB) {
"replication_queue_lock",
"node_status",
"shard_primaries",
+ "storage_repositories",
+ "repositories",
)
}
diff --git a/internal/praefect/datastore/migrations/20200707101830_repositories_table.go b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go
new file mode 100644
index 000000000..9edd646a1
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go
@@ -0,0 +1,30 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200707101830_repositories_table",
+ Up: []string{`
+CREATE TABLE repositories (
+ virtual_storage TEXT,
+ relative_path TEXT,
+ generation BIGINT NOT NULL,
+ PRIMARY KEY (virtual_storage, relative_path)
+)`, `
+CREATE TABLE storage_repositories (
+ virtual_storage TEXT,
+ relative_path TEXT,
+ storage TEXT,
+ generation BIGINT NOT NULL,
+ PRIMARY KEY (virtual_storage, relative_path, storage)
+)
+`},
+ Down: []string{
+ "DROP TABLE storage_repositories",
+ "DROP TABLE repositories",
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go
new file mode 100644
index 000000000..f078bc55b
--- /dev/null
+++ b/internal/praefect/datastore/repository_memory.go
@@ -0,0 +1,221 @@
+package datastore
+
+import (
+ "context"
+ "sync"
+)
+
+// MemoryRepositoryStore is an in-memory implementation of RepositoryStore.
+// Refer to the interface for method documentation.
+type MemoryRepositoryStore struct {
+ m sync.Mutex
+
+ storages map[string][]string
+ virtualStorageState
+ storageState
+}
+
+// virtualStorageStates represents the virtual storage's view of what state the repositories should be in.
+// It structured as virtual-storage->relative_path->generation.
+type virtualStorageState map[string]map[string]int
+
+// storageState contains individual storage's repository states.
+// It structured as virtual-storage->relative_path->storage->generation.
+type storageState map[string]map[string]map[string]int
+
+// NewMemoryRepositoryStore returns an in-memory implementation of RepositoryStore.
+func NewMemoryRepositoryStore(storages map[string][]string) *MemoryRepositoryStore {
+ return &MemoryRepositoryStore{
+ storages: storages,
+ storageState: make(storageState),
+ virtualStorageState: make(virtualStorageState),
+ }
+}
+
+func (m *MemoryRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ return m.getStorageGeneration(virtualStorage, relativePath, storage), nil
+}
+
+func (m *MemoryRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error) {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ baseGen := m.getRepositoryGeneration(virtualStorage, relativePath)
+ nextGen := baseGen + 1
+
+ m.setGeneration(virtualStorage, relativePath, primary, nextGen)
+
+ // If a secondary does not have a generation, it's in an undefined state. We'll only
+ // pick secondaries on the same generation as the primary to ensure they begin from the
+ // same starting state.
+ if baseGen != GenerationUnknown {
+ for _, secondary := range secondaries {
+ currentGen := m.getStorageGeneration(virtualStorage, relativePath, secondary)
+ // If the secondary is not on the same generation as the primary, the secondary
+ // has failed a concurrent reference transaction. We won't increment its
+ // generation as it has not applied writes in previous genereations, leaving
+ // its state undefined.
+ if currentGen != baseGen {
+ continue
+ }
+
+ m.setGeneration(virtualStorage, relativePath, secondary, nextGen)
+ }
+ }
+
+ return nextGen, nil
+}
+
+func (m *MemoryRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ m.setGeneration(virtualStorage, relativePath, storage, generation)
+
+ return nil
+}
+
+func (m *MemoryRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ latestGen := m.getRepositoryGeneration(virtualStorage, relativePath)
+ storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage)
+
+ m.deleteRepository(virtualStorage, relativePath)
+ m.deleteStorageRepository(virtualStorage, relativePath, storage)
+
+ if latestGen == GenerationUnknown && storageGen == GenerationUnknown {
+ return RepositoryNotExistsError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ }
+ }
+
+ return nil
+}
+
+func (m *MemoryRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ latestGen := m.getRepositoryGeneration(virtualStorage, relativePath)
+ storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage)
+
+ if latestGen != GenerationUnknown {
+ m.deleteRepository(virtualStorage, relativePath)
+ m.setRepositoryGeneration(virtualStorage, newRelativePath, latestGen)
+ }
+
+ if storageGen != GenerationUnknown {
+ m.deleteStorageRepository(virtualStorage, relativePath, storage)
+ m.setStorageGeneration(virtualStorage, newRelativePath, storage, storageGen)
+ }
+
+ if latestGen == GenerationUnknown && storageGen == GenerationUnknown {
+ return RepositoryNotExistsError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ }
+ }
+
+ return nil
+}
+
+func (m *MemoryRepositoryStore) EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ if current := m.getStorageGeneration(virtualStorage, relativePath, storage); current != GenerationUnknown && current >= generation {
+ return downgradeAttemptedError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ currentGeneration: current,
+ attemptedGeneration: generation,
+ }
+ }
+
+ return nil
+}
+
+func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int {
+ gen, ok := m.virtualStorageState[virtualStorage][relativePath]
+ if !ok {
+ return GenerationUnknown
+ }
+
+ return gen
+}
+
+func (m *MemoryRepositoryStore) getStorageGeneration(virtualStorage, relativePath, storage string) int {
+ gen, ok := m.storageState[virtualStorage][relativePath][storage]
+ if !ok {
+ return GenerationUnknown
+ }
+
+ return gen
+}
+
+func (m *MemoryRepositoryStore) deleteRepository(virtualStorage, relativePath string) {
+ rels := m.virtualStorageState[virtualStorage]
+ if rels == nil {
+ return
+ }
+
+ delete(rels, relativePath)
+ if len(rels) == 0 {
+ delete(m.virtualStorageState, virtualStorage)
+ }
+}
+
+func (m *MemoryRepositoryStore) deleteStorageRepository(virtualStorage, relativePath, storage string) {
+ storages := m.storageState[virtualStorage][relativePath]
+ if storages == nil {
+ return
+ }
+
+ delete(storages, storage)
+ if len(m.storageState[virtualStorage][relativePath]) == 0 {
+ delete(m.storageState[virtualStorage], relativePath)
+ }
+
+ if len(m.storageState[virtualStorage]) == 0 {
+ delete(m.storageState, virtualStorage)
+ }
+}
+
+func (m *MemoryRepositoryStore) setGeneration(virtualStorage, relativePath, storage string, generation int) {
+ m.setRepositoryGeneration(virtualStorage, relativePath, generation)
+ m.setStorageGeneration(virtualStorage, relativePath, storage, generation)
+}
+
+func (m *MemoryRepositoryStore) setRepositoryGeneration(virtualStorage, relativePath string, generation int) {
+ current := m.getRepositoryGeneration(virtualStorage, relativePath)
+ if generation <= current {
+ return
+ }
+
+ if m.virtualStorageState[virtualStorage] == nil {
+ m.virtualStorageState[virtualStorage] = make(map[string]int)
+ }
+
+ m.virtualStorageState[virtualStorage][relativePath] = generation
+}
+
+func (m *MemoryRepositoryStore) setStorageGeneration(virtualStorage, relativePath, storage string, generation int) {
+ if m.storageState[virtualStorage] == nil {
+ m.storageState[virtualStorage] = make(map[string]map[string]int)
+ }
+
+ if m.storageState[virtualStorage][relativePath] == nil {
+ m.storageState[virtualStorage][relativePath] = make(map[string]int)
+ }
+
+ m.storageState[virtualStorage][relativePath][storage] = generation
+}
diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_memory_test.go
new file mode 100644
index 000000000..f0c4b4de4
--- /dev/null
+++ b/internal/praefect/datastore/repository_memory_test.go
@@ -0,0 +1,446 @@
+package datastore
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+type requireState func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState)
+type repositoryStoreFactory func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState)
+
+func TestRepositoryStore_Memory(t *testing.T) {
+ testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) {
+ rs := NewMemoryRepositoryStore(storages)
+ return rs, func(t *testing.T, _ context.Context, vss virtualStorageState, ss storageState) {
+ t.Helper()
+ require.Equal(t, vss, rs.virtualStorageState)
+ require.Equal(t, ss, rs.storageState)
+ }
+ })
+}
+
+func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const (
+ vs = "virtual-storage-1"
+ repo = "repository-1"
+ stor = "storage-1"
+ )
+
+ t.Run("IncrementGeneration", func(t *testing.T) {
+ t.Run("creates a new record for primary", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"secondary-1"})
+ require.NoError(t, err)
+ require.Equal(t, 0, generation)
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 0,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "primary": 0,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("increments existing record for primary", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 0))
+ generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"secondary-1"})
+ require.NoError(t, err)
+ require.Equal(t, 1, generation)
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "primary": 1,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("increments existing for up to date secondary", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 1))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "up-to-date-secondary", 1))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "outdated-secondary", 0))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "primary": 1,
+ "up-to-date-secondary": 1,
+ "outdated-secondary": 0,
+ },
+ },
+ },
+ )
+
+ generation, err := rs.IncrementGeneration(ctx, vs, repo, "primary", []string{
+ "up-to-date-secondary", "outdated-secondary", "non-existing-secondary"})
+ require.NoError(t, err)
+ require.Equal(t, 2, generation)
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 2,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "primary": 2,
+ "up-to-date-secondary": 2,
+ "outdated-secondary": 0,
+ },
+ },
+ },
+ )
+ })
+ })
+
+ t.Run("SetGeneration", func(t *testing.T) {
+ t.Run("creates a record", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ err := rs.SetGeneration(ctx, vs, repo, stor, 1)
+ require.NoError(t, err)
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 1,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("updates existing record", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 0,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("increments stays monotonic", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+
+ generation, err := rs.IncrementGeneration(ctx, vs, repo, stor, nil)
+ require.NoError(t, err)
+ require.Equal(t, 2, generation)
+
+ generation, err = rs.IncrementGeneration(ctx, vs, repo, "storage-2", nil)
+ require.NoError(t, err)
+ require.Equal(t, 3, generation)
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 3,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 2,
+ "storage-2": 3,
+ },
+ },
+ },
+ )
+ })
+ })
+
+ t.Run("GetGeneration", func(t *testing.T) {
+ rs, _ := newStore(t, nil)
+
+ generation, err := rs.GetGeneration(ctx, vs, repo, stor)
+ require.NoError(t, err)
+ require.Equal(t, GenerationUnknown, generation)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+
+ generation, err = rs.GetGeneration(ctx, vs, repo, stor)
+ require.NoError(t, err)
+ require.Equal(t, 0, generation)
+ })
+
+ t.Run("EnsureUpgrade", func(t *testing.T) {
+ t.Run("no previous record allowed", func(t *testing.T) {
+ rs, _ := newStore(t, nil)
+ require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown))
+ require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, 0))
+ })
+
+ t.Run("upgrade allowed", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+ require.NoError(t, rs.EnsureUpgrade(ctx, vs, repo, stor, 1))
+ require.Error(t,
+ downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown},
+ rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 0,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 0,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("downgrade prevented", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
+ require.Equal(t,
+ downgradeAttemptedError{vs, repo, stor, 1, 0},
+ rs.EnsureUpgrade(ctx, vs, repo, stor, 0))
+ require.Error(t,
+ downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown},
+ rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 1,
+ },
+ },
+ },
+ )
+ })
+
+ t.Run("same version prevented", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
+ require.Equal(t,
+ downgradeAttemptedError{vs, repo, stor, 1, 1},
+ rs.EnsureUpgrade(ctx, vs, repo, stor, 1))
+ require.Error(t,
+ downgradeAttemptedError{vs, repo, stor, 1, GenerationUnknown},
+ rs.EnsureUpgrade(ctx, vs, repo, stor, GenerationUnknown))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": 1,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 1,
+ },
+ },
+ },
+ )
+ })
+ })
+
+ t.Run("DeleteRepository", func(t *testing.T) {
+ t.Run("delete non-existing", func(t *testing.T) {
+ rs, _ := newStore(t, nil)
+
+ require.Equal(t,
+ RepositoryNotExistsError{vs, repo, stor},
+ rs.DeleteRepository(ctx, vs, repo, stor),
+ )
+ })
+
+ t.Run("delete existing", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, "deleted", "deleted", "deleted", 0))
+
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage", 0))
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "other-storages-remain", "remaining-storage", 0))
+
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage", 0))
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-2", "other-repo-remains", "remaining-storage", 0))
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "deleted": {
+ "deleted": 0,
+ },
+ "virtual-storage-1": {
+ "other-storages-remain": 0,
+ },
+ "virtual-storage-2": {
+ "deleted-repo": 0,
+ "other-repo-remains": 0,
+ },
+ },
+ storageState{
+ "deleted": {
+ "deleted": {
+ "deleted": 0,
+ },
+ },
+ "virtual-storage-1": {
+ "other-storages-remain": {
+ "deleted-storage": 0,
+ "remaining-storage": 0,
+ },
+ },
+ "virtual-storage-2": {
+ "deleted-repo": {
+ "deleted-storage": 0,
+ },
+ "other-repo-remains": {
+ "remaining-storage": 0,
+ },
+ },
+ },
+ )
+
+ require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", "deleted"))
+ require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage"))
+ require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage"))
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-2": {
+ "other-repo-remains": 0,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "other-storages-remain": {
+ "remaining-storage": 0,
+ },
+ },
+ "virtual-storage-2": {
+ "other-repo-remains": {
+ "remaining-storage": 0,
+ },
+ },
+ },
+ )
+ })
+ })
+
+ t.Run("RenameRepository", func(t *testing.T) {
+ t.Run("rename non-existing", func(t *testing.T) {
+ rs, _ := newStore(t, nil)
+
+ require.Equal(t,
+ RepositoryNotExistsError{vs, repo, stor},
+ rs.RenameRepository(ctx, vs, repo, stor, "repository-2"),
+ )
+ })
+
+ t.Run("rename existing", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-all", "storage-1", 0))
+ require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-some", "storage-1", 0))
+ require.NoError(t, rs.SetGeneration(ctx, vs, "renamed-some", "storage-2", 0))
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "renamed-all": 0,
+ "renamed-some": 0,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "renamed-all": {
+ "storage-1": 0,
+ },
+ "renamed-some": {
+ "storage-1": 0,
+ "storage-2": 0,
+ },
+ },
+ },
+ )
+
+ require.NoError(t, rs.RenameRepository(ctx, vs, "renamed-all", "storage-1", "renamed-all-new"))
+ require.NoError(t, rs.RenameRepository(ctx, vs, "renamed-some", "storage-1", "renamed-some-new"))
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "renamed-all-new": 0,
+ "renamed-some-new": 0,
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "renamed-all-new": {
+ "storage-1": 0,
+ },
+ "renamed-some-new": {
+ "storage-1": 0,
+ },
+ "renamed-some": {
+ "storage-2": 0,
+ },
+ },
+ },
+ )
+ })
+ })
+}
diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_postgres.go
new file mode 100644
index 000000000..433a46776
--- /dev/null
+++ b/internal/praefect/datastore/repository_postgres.go
@@ -0,0 +1,284 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+
+ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+// GenerationUnknown is used to indicate lack of generation number in
+// a replication job. Older instances can produce replication jobs
+// without a generation number.
+const GenerationUnknown = -1
+
+type downgradeAttemptedError struct {
+ virtualStorage string
+ relativePath string
+ storage string
+ currentGeneration int
+ attemptedGeneration int
+}
+
+func (err downgradeAttemptedError) Error() string {
+ return fmt.Sprintf("attempted downgrading %q -> %q -> %q from generation %d to %d",
+ err.virtualStorage, err.relativePath, err.storage, err.currentGeneration, err.attemptedGeneration,
+ )
+}
+
+// RepositoryNotExistsError is returned when trying to perform an operation on a non-existent repository.
+type RepositoryNotExistsError struct {
+ virtualStorage string
+ relativePath string
+ storage string
+}
+
+// Is checks whetehr the other errors is of the same type.
+func (err RepositoryNotExistsError) Is(other error) bool {
+ _, ok := other.(RepositoryNotExistsError)
+ return ok
+}
+
+// Error returns the errors message.
+func (err RepositoryNotExistsError) Error() string {
+ return fmt.Sprintf("repository %q -> %q -> %q does not exist",
+ err.virtualStorage, err.relativePath, err.storage,
+ )
+}
+
+// RepositoryStore provides access to repository state.
+type RepositoryStore interface {
+ // GetGeneration gets the repository's generation on a given storage.
+ GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ // IncrementGeneration increments the primary's and the up to date secondaries' generations.
+ IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error)
+ // SetGeneration sets the repository's generation on the given storage. If the generation is higher
+ // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
+ SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ // EnsureUpgrade returns an error if the given generation would downgrade the storage's repository.
+ EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ // DeleteRepository deletes the repository from the virtual storage and the storage. Returns
+ // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage
+ // or the storage.
+ DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
+ // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well
+ // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
+ // which has no record in the virtual storage or the storage.
+ RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
+}
+
+// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
+// Refer to the interface for method documentation.
+type PostgresRepositoryStore struct {
+ db glsql.Querier
+ storages map[string][]string
+}
+
+// NewLocalRepositoryStore returns a Postgres implementation of RepositoryStore.
+func NewPostgresRepositoryStore(db glsql.Querier, storages map[string][]string) *PostgresRepositoryStore {
+ return &PostgresRepositoryStore{db: db, storages: storages}
+}
+
+func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
+ const q = `
+SELECT generation
+FROM storage_repositories
+WHERE virtual_storage = $1
+AND relative_path = $2
+AND storage = $3
+`
+
+ var gen int
+ if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&gen); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return GenerationUnknown, nil
+ }
+
+ return 0, err
+ }
+
+ return gen, nil
+}
+
+func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) (int, error) {
+ // The query works as follows:
+ // 1. `next_generation` CTE increments the latest generation by 1. If no previous records exists,
+ // the generation starts from 0.
+ // 2. `base_generation` CTE gets the primary's current generation. A secondary has to be on the primary's
+ // generation, otherwise its generation won't be incremented. This avoids any issues where a concurrent
+ // reference transaction has failed and the secondary is no longer up to date when we are incrementing
+ // the generations.
+ // 3. `eligible_secondaries` filters out secondaries which participated in a transaction but failed a
+ /// concurrent transaction.
+ // 4. `eligible_storages` CTE combines the primary and the up to date secondaries in a list of storages to
+ // to increment the generation for.
+ // 5. Finally, we upsert the records in 'storage_repositories' table to match the new generation for the
+ // eligble storages.
+
+ const q = `
+WITH next_generation AS (
+ INSERT INTO repositories (
+ virtual_storage,
+ relative_path,
+ generation
+ ) VALUES ($1, $2, 0)
+ ON CONFLICT (virtual_storage, relative_path) DO
+ UPDATE SET generation = repositories.generation + 1
+ RETURNING virtual_storage, relative_path, generation
+), base_generation AS (
+ SELECT virtual_storage, relative_path, generation
+ FROM storage_repositories
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ AND storage = $3
+ FOR UPDATE
+), eligible_secondaries AS (
+ SELECT storage
+ FROM storage_repositories
+ NATURAL JOIN base_generation
+ WHERE storage = ANY($4::text[])
+ FOR UPDATE
+), eligible_storages AS (
+ SELECT storage
+ FROM eligible_secondaries
+ UNION
+ SELECT $3
+)
+
+INSERT INTO storage_repositories AS sr (
+ virtual_storage,
+ relative_path,
+ storage,
+ generation
+)
+SELECT virtual_storage, relative_path, storage, generation
+FROM eligible_storages
+CROSS JOIN next_generation
+ON CONFLICT (virtual_storage, relative_path, storage) DO
+ UPDATE SET generation = EXCLUDED.generation
+RETURNING generation
+`
+
+ var generation int
+ if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, primary, pq.StringArray(secondaries)).Scan(&generation); err != nil {
+ return 0, err
+ }
+
+ return generation, nil
+}
+
+func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+ const q = `
+WITH repository AS (
+ INSERT INTO repositories (
+ virtual_storage,
+ relative_path,
+ generation
+ ) VALUES ($1, $2, $4)
+ ON CONFLICT (virtual_storage, relative_path) DO
+ UPDATE SET generation = EXCLUDED.generation
+ WHERE repositories.generation < EXCLUDED.generation
+)
+
+INSERT INTO storage_repositories (
+ virtual_storage,
+ relative_path,
+ storage,
+ generation
+)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET
+ generation = EXCLUDED.generation
+`
+
+ _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation)
+ return err
+}
+
+func (rs *PostgresRepositoryStore) EnsureUpgrade(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+ current, err := rs.GetGeneration(ctx, virtualStorage, relativePath, storage)
+ if err != nil {
+ return err
+ }
+
+ if current != GenerationUnknown && current >= generation {
+ return downgradeAttemptedError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ currentGeneration: current,
+ attemptedGeneration: generation,
+ }
+ }
+
+ return nil
+}
+
+func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ const q = `
+WITH repo AS (
+ DELETE FROM repositories
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+)
+
+DELETE FROM storage_repositories
+WHERE virtual_storage = $1
+AND relative_path = $2
+AND storage = $3
+`
+
+ result, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage)
+ if err != nil {
+ return err
+ }
+
+ if n, err := result.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ return RepositoryNotExistsError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ }
+ }
+
+ return nil
+}
+
+func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
+ const q = `
+WITH repo AS (
+ UPDATE repositories
+ SET relative_path = $4
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+)
+
+UPDATE storage_repositories
+SET relative_path = $4
+WHERE virtual_storage = $1
+AND relative_path = $2
+AND storage = $3
+`
+
+ result, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, newRelativePath)
+ if err != nil {
+ return err
+ }
+
+ if n, err := result.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ return RepositoryNotExistsError{
+ virtualStorage: virtualStorage,
+ relativePath: relativePath,
+ storage: storage,
+ }
+ }
+
+ return err
+}
diff --git a/internal/praefect/datastore/repository_postgres_test.go b/internal/praefect/datastore/repository_postgres_test.go
new file mode 100644
index 000000000..4b5642084
--- /dev/null
+++ b/internal/praefect/datastore/repository_postgres_test.go
@@ -0,0 +1,75 @@
+// +build postgres
+
+package datastore
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestRepositoryStore_Postgres(t *testing.T) {
+ testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) {
+ db := getDB(t)
+ gs := NewPostgresRepositoryStore(db, storages)
+
+ requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) {
+ rows, err := db.QueryContext(ctx, `
+SELECT virtual_storage, relative_path, generation
+FROM repositories
+ `)
+ require.NoError(t, err)
+ defer rows.Close()
+
+ act := make(virtualStorageState)
+ for rows.Next() {
+ var vs, rel string
+ var gen int
+ require.NoError(t, rows.Scan(&vs, &rel, &gen))
+ if act[vs] == nil {
+ act[vs] = make(map[string]int)
+ }
+
+ act[vs][rel] = gen
+ }
+
+ require.NoError(t, rows.Err())
+ require.Equal(t, exp, act)
+ }
+
+ requireStorageState := func(t *testing.T, ctx context.Context, exp storageState) {
+ rows, err := db.QueryContext(ctx, `
+SELECT virtual_storage, relative_path, storage, generation
+FROM storage_repositories
+ `)
+ require.NoError(t, err)
+ defer rows.Close()
+
+ act := make(storageState)
+ for rows.Next() {
+ var vs, rel, storage string
+ var gen int
+ require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen))
+
+ if act[vs] == nil {
+ act[vs] = make(map[string]map[string]int)
+ }
+ if act[vs][rel] == nil {
+ act[vs][rel] = make(map[string]int)
+ }
+
+ act[vs][rel][storage] = gen
+ }
+
+ require.NoError(t, rows.Err())
+ require.Equal(t, exp, act)
+ }
+
+ return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) {
+ t.Helper()
+ requireVirtualStorageState(t, ctx, vss)
+ requireStorageState(t, ctx, ss)
+ }
+ })
+}