diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-03 13:00:18 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-03 13:00:18 +0300 |
commit | 2721def3fcccaee50fff5fb2ca48c5b562623738 (patch) | |
tree | 55231c84c9a985afdd2a64e6c3552aef0d9180cb /internal/praefect/datastore | |
parent | 8f493e61178b78035390cb4948deca4fe567350b (diff) | |
parent | 059fb5f1e1046dfab1d193c4c75e900c9071d5c4 (diff) |
Merge branch 'smh-remove-memory-store' into 'master'
Remove MemoryRepositoryStore
See merge request gitlab-org/gitaly!2845
Diffstat (limited to 'internal/praefect/datastore')
-rw-r--r-- | internal/praefect/datastore/repository_memory.go | 345 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_postgres_test.go | 74 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go (renamed from internal/praefect/datastore/repository_postgres.go) | 11 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_bm_test.go (renamed from internal/praefect/datastore/repository_postgres_bm_test.go) | 0 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 107 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go (renamed from internal/praefect/datastore/repository_memory_test.go) | 73 |
6 files changed, 186 insertions, 424 deletions
diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go deleted file mode 100644 index e21545573..000000000 --- a/internal/praefect/datastore/repository_memory.go +++ /dev/null @@ -1,345 +0,0 @@ -package datastore - -import ( - "context" - "fmt" - "sync" -) - -// MemoryRepositoryStore is an in-memory implementation of RepositoryStore. -// Refer to the interface for method documentation. -type MemoryRepositoryStore struct { - m sync.Mutex - - storages - virtualStorageState - storageState -} - -type storages map[string][]string - -func (s storages) secondaries(virtualStorage, primary string) ([]string, error) { - storages, err := s.storages(virtualStorage) - if err != nil { - return nil, err - } - - primaryFound := false - secondaries := make([]string, 0, len(storages)-1) - for _, storage := range storages { - if storage == primary { - primaryFound = true - continue - } - - secondaries = append(secondaries, storage) - } - - if !primaryFound { - return nil, fmt.Errorf("primary not found: %q", primary) - } - - return secondaries, nil -} - -func (s storages) storages(virtualStorage string) ([]string, error) { - storages, ok := s[virtualStorage] - if !ok { - return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) - } - - return storages, nil -} - -// virtualStorageStates represents the virtual storage's view of which repositories should exist. -// It's structured as virtual-storage->relative_path. -type virtualStorageState map[string]map[string]struct{} - -// storageState contains individual storage's repository states. -// It structured as virtual-storage->relative_path->storage->generation. -type storageState map[string]map[string]map[string]int - -// NewMemoryRepositoryStore returns an in-memory implementation of RepositoryStore. -func NewMemoryRepositoryStore(configuredStorages map[string][]string) *MemoryRepositoryStore { - return &MemoryRepositoryStore{ - storages: storages(configuredStorages), - storageState: make(storageState), - virtualStorageState: make(virtualStorageState), - } -} - -func (m *MemoryRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { - m.m.Lock() - defer m.m.Unlock() - - return m.getStorageGeneration(virtualStorage, relativePath, storage), nil -} - -func (m *MemoryRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error { - m.m.Lock() - defer m.m.Unlock() - - baseGen := m.getRepositoryGeneration(virtualStorage, relativePath) - nextGen := baseGen + 1 - - m.setGeneration(virtualStorage, relativePath, primary, nextGen) - - // If a secondary does not have a generation, it's in an undefined state. We'll only - // pick secondaries on the same generation as the primary to ensure they begin from the - // same starting state. - if baseGen != GenerationUnknown { - for _, secondary := range secondaries { - currentGen := m.getStorageGeneration(virtualStorage, relativePath, secondary) - // If the secondary is not on the same generation as the primary, the secondary - // has failed a concurrent reference transaction. We won't increment its - // generation as it has not applied writes in previous genereations, leaving - // its state undefined. - if currentGen != baseGen { - continue - } - - m.setGeneration(virtualStorage, relativePath, secondary, nextGen) - } - } - - return nil -} - -func (m *MemoryRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { - m.m.Lock() - defer m.m.Unlock() - - m.setGeneration(virtualStorage, relativePath, storage, generation) - - return nil -} - -func (m *MemoryRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { - m.m.Lock() - defer m.m.Unlock() - - latestGen := m.getRepositoryGeneration(virtualStorage, relativePath) - storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage) - - m.deleteRepository(virtualStorage, relativePath) - m.deleteStorageRepository(virtualStorage, relativePath, storage) - - if latestGen == GenerationUnknown && storageGen == GenerationUnknown { - return RepositoryNotExistsError{ - virtualStorage: virtualStorage, - relativePath: relativePath, - storage: storage, - } - } - - return nil -} - -func (m *MemoryRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { - m.m.Lock() - defer m.m.Unlock() - - latestGen := m.getRepositoryGeneration(virtualStorage, relativePath) - storageGen := m.getStorageGeneration(virtualStorage, relativePath, storage) - - if latestGen != GenerationUnknown { - m.deleteRepository(virtualStorage, relativePath) - m.createRepository(virtualStorage, newRelativePath) - } - - if storageGen != GenerationUnknown { - m.deleteStorageRepository(virtualStorage, relativePath, storage) - m.setStorageGeneration(virtualStorage, newRelativePath, storage, storageGen) - } - - if latestGen == GenerationUnknown && storageGen == GenerationUnknown { - return RepositoryNotExistsError{ - virtualStorage: virtualStorage, - relativePath: relativePath, - storage: storage, - } - } - - return nil -} - -func (m *MemoryRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) { - m.m.Lock() - defer m.m.Unlock() - - sourceGeneration := m.getStorageGeneration(virtualStorage, relativePath, source) - targetGeneration := m.getStorageGeneration(virtualStorage, relativePath, target) - - if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration { - return 0, DowngradeAttemptedError{ - VirtualStorage: virtualStorage, - RelativePath: relativePath, - Storage: target, - CurrentGeneration: targetGeneration, - AttemptedGeneration: sourceGeneration, - } - } - - return sourceGeneration, nil -} - -func (m *MemoryRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { - m.m.Lock() - defer m.m.Unlock() - - secondaries, err := m.storages.secondaries(virtualStorage, primary) - if err != nil { - return nil, err - } - - expectedGen := m.getRepositoryGeneration(virtualStorage, relativePath) - if expectedGen == GenerationUnknown { - return nil, nil - } - - consistentSecondaries := make(map[string]struct{}, len(secondaries)) - for _, secondary := range secondaries { - gen := m.getStorageGeneration(virtualStorage, relativePath, secondary) - if gen == expectedGen { - consistentSecondaries[secondary] = struct{}{} - } - } - - return consistentSecondaries, nil -} - -func (m *MemoryRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { - expected := m.getRepositoryGeneration(virtualStorage, relativePath) - if expected == GenerationUnknown { - return true, nil - } - - actual := m.getStorageGeneration(virtualStorage, relativePath, storage) - return expected == actual, nil -} - -func (m *MemoryRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) { - return m.getRepositoryGeneration(virtualStorage, relativePath) != GenerationUnknown, nil -} - -func (m *MemoryRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { - m.m.Lock() - defer m.m.Unlock() - - m.deleteStorageRepository(virtualStorage, relativePath, storage) - if len(m.storageState[virtualStorage][relativePath]) == 0 { - m.deleteRepository(virtualStorage, relativePath) - } - - return nil -} - -func (m *MemoryRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { - m.m.Lock() - defer m.m.Unlock() - - storages, ok := m.storages[virtualStorage] - if !ok { - return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) - } - - outdatedRepos := make(map[string]map[string]int) - repositories, ok := m.virtualStorageState[virtualStorage] - if !ok { - return outdatedRepos, nil - } - - for relativePath := range repositories { - latestGeneration := m.getRepositoryGeneration(virtualStorage, relativePath) - - for _, storage := range storages { - if gen := m.getStorageGeneration(virtualStorage, relativePath, storage); gen < latestGeneration { - if outdatedRepos[relativePath] == nil { - outdatedRepos[relativePath] = make(map[string]int) - } - - outdatedRepos[relativePath][storage] = latestGeneration - gen - } - } - } - - return outdatedRepos, nil -} - -func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int { - generations, ok := m.storageState[virtualStorage][relativePath] - if !ok { - return GenerationUnknown - } - - max := GenerationUnknown - for _, generation := range generations { - if generation > max { - max = generation - } - } - - return max -} - -func (m *MemoryRepositoryStore) getStorageGeneration(virtualStorage, relativePath, storage string) int { - gen, ok := m.storageState[virtualStorage][relativePath][storage] - if !ok { - return GenerationUnknown - } - - return gen -} - -func (m *MemoryRepositoryStore) deleteRepository(virtualStorage, relativePath string) { - rels := m.virtualStorageState[virtualStorage] - if rels == nil { - return - } - - delete(rels, relativePath) - if len(rels) == 0 { - delete(m.virtualStorageState, virtualStorage) - } -} - -func (m *MemoryRepositoryStore) deleteStorageRepository(virtualStorage, relativePath, storage string) { - storages := m.storageState[virtualStorage][relativePath] - if storages == nil { - return - } - - delete(storages, storage) - if len(m.storageState[virtualStorage][relativePath]) == 0 { - delete(m.storageState[virtualStorage], relativePath) - } - - if len(m.storageState[virtualStorage]) == 0 { - delete(m.storageState, virtualStorage) - } -} - -func (m *MemoryRepositoryStore) setGeneration(virtualStorage, relativePath, storage string, generation int) { - m.createRepository(virtualStorage, relativePath) - m.setStorageGeneration(virtualStorage, relativePath, storage, generation) -} - -func (m *MemoryRepositoryStore) createRepository(virtualStorage, relativePath string) { - if m.virtualStorageState[virtualStorage] == nil { - m.virtualStorageState[virtualStorage] = make(map[string]struct{}) - } - - m.virtualStorageState[virtualStorage][relativePath] = struct{}{} -} - -func (m *MemoryRepositoryStore) setStorageGeneration(virtualStorage, relativePath, storage string, generation int) { - if m.storageState[virtualStorage] == nil { - m.storageState[virtualStorage] = make(map[string]map[string]int) - } - - if m.storageState[virtualStorage][relativePath] == nil { - m.storageState[virtualStorage][relativePath] = make(map[string]int) - } - - m.storageState[virtualStorage][relativePath][storage] = generation -} diff --git a/internal/praefect/datastore/repository_postgres_test.go b/internal/praefect/datastore/repository_postgres_test.go deleted file mode 100644 index 76781cbcd..000000000 --- a/internal/praefect/datastore/repository_postgres_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// +build postgres - -package datastore - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestRepositoryStore_Postgres(t *testing.T) { - testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) { - db := getDB(t) - gs := NewPostgresRepositoryStore(db, storages) - - requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) { - rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path -FROM repositories - `) - require.NoError(t, err) - defer rows.Close() - - act := make(virtualStorageState) - for rows.Next() { - var vs, rel string - require.NoError(t, rows.Scan(&vs, &rel)) - if act[vs] == nil { - act[vs] = make(map[string]struct{}) - } - - act[vs][rel] = struct{}{} - } - - require.NoError(t, rows.Err()) - require.Equal(t, exp, act) - } - - requireStorageState := func(t *testing.T, ctx context.Context, exp storageState) { - rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path, storage, generation -FROM storage_repositories - `) - require.NoError(t, err) - defer rows.Close() - - act := make(storageState) - for rows.Next() { - var vs, rel, storage string - var gen int - require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen)) - - if act[vs] == nil { - act[vs] = make(map[string]map[string]int) - } - if act[vs][rel] == nil { - act[vs][rel] = make(map[string]int) - } - - act[vs][rel][storage] = gen - } - - require.NoError(t, rows.Err()) - require.Equal(t, exp, act) - } - - return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) { - t.Helper() - requireVirtualStorageState(t, ctx, vss) - requireStorageState(t, ctx, ss) - } - }) -} diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_store.go index 9b5a1e03c..1a25ae1bf 100644 --- a/internal/praefect/datastore/repository_postgres.go +++ b/internal/praefect/datastore/repository_store.go @@ -10,6 +10,17 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) +type storages map[string][]string + +func (s storages) storages(virtualStorage string) ([]string, error) { + storages, ok := s[virtualStorage] + if !ok { + return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) + } + + return storages, nil +} + // GenerationUnknown is used to indicate lack of generation number in // a replication job. Older instances can produce replication jobs // without a generation number. diff --git a/internal/praefect/datastore/repository_postgres_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go index 7c8647609..7c8647609 100644 --- a/internal/praefect/datastore/repository_postgres_bm_test.go +++ b/internal/praefect/datastore/repository_store_bm_test.go diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go new file mode 100644 index 000000000..8220d5898 --- /dev/null +++ b/internal/praefect/datastore/repository_store_mock.go @@ -0,0 +1,107 @@ +package datastore + +import "context" + +// MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods +// default to what could be considered success if not set. +type MockRepositoryStore struct { + GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) + IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error + IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) + GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) + SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error + DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error + GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) + GetOutdatedRepositoriesFunc func(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) + DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) +} + +func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { + if m.GetGenerationFunc == nil { + return GenerationUnknown, nil + } + + return m.GetGenerationFunc(ctx, virtualStorage, relativePath, storage) +} + +func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error { + if m.IncrementGenerationFunc == nil { + return nil + } + + return m.IncrementGenerationFunc(ctx, virtualStorage, relativePath, primary, secondaries) +} + +func (m MockRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { + if m.IsLatestGenerationFunc == nil { + return true, nil + } + + return m.IsLatestGenerationFunc(ctx, virtualStorage, relativePath, storage) +} + +func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) { + if m.GetReplicatedGenerationFunc == nil { + return GenerationUnknown, nil + } + + return m.GetReplicatedGenerationFunc(ctx, virtualStorage, relativePath, source, target) +} + +func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error { + if m.SetGenerationFunc == nil { + return nil + } + + return m.SetGenerationFunc(ctx, virtualStorage, relativePath, storage, generation) +} + +func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { + if m.DeleteRepositoryFunc == nil { + return nil + } + + return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storage) +} + +func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { + if m.RenameRepositoryFunc == nil { + return nil + } + + return m.RenameRepositoryFunc(ctx, virtualStorage, relativePath, storage, newRelativePath) +} + +func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + if m.GetConsistentSecondariesFunc == nil { + return map[string]struct{}{}, nil + } + + return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary) +} + +func (m MockRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { + if m.GetOutdatedRepositoriesFunc == nil { + return nil, nil + } + + return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage) +} + +func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { + if m.DeleteInvalidRepositoryFunc == nil { + return nil + } + + return m.DeleteInvalidRepositoryFunc(ctx, virtualStorage, relativePath, storage) +} + +func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) { + if m.RepositoryExistsFunc == nil { + return true, nil + } + + return m.RepositoryExistsFunc(ctx, virtualStorage, relativePath) +} diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_store_test.go index 562c15104..96f66b728 100644 --- a/internal/praefect/datastore/repository_memory_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -1,3 +1,5 @@ +// +build postgres + package datastore import ( @@ -8,16 +10,77 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) +// virtualStorageStates represents the virtual storage's view of which repositories should exist. +// It's structured as virtual-storage->relative_path. +type virtualStorageState map[string]map[string]struct{} + +// storageState contains individual storage's repository states. +// It structured as virtual-storage->relative_path->storage->generation. +type storageState map[string]map[string]map[string]int + type requireState func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) type repositoryStoreFactory func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) -func TestRepositoryStore_Memory(t *testing.T) { +func TestRepositoryStore_Postgres(t *testing.T) { testRepositoryStore(t, func(t *testing.T, storages map[string][]string) (RepositoryStore, requireState) { - rs := NewMemoryRepositoryStore(storages) - return rs, func(t *testing.T, _ context.Context, vss virtualStorageState, ss storageState) { + db := getDB(t) + gs := NewPostgresRepositoryStore(db, storages) + + requireVirtualStorageState := func(t *testing.T, ctx context.Context, exp virtualStorageState) { + rows, err := db.QueryContext(ctx, ` +SELECT virtual_storage, relative_path +FROM repositories + `) + require.NoError(t, err) + defer rows.Close() + + act := make(virtualStorageState) + for rows.Next() { + var vs, rel string + require.NoError(t, rows.Scan(&vs, &rel)) + if act[vs] == nil { + act[vs] = make(map[string]struct{}) + } + + act[vs][rel] = struct{}{} + } + + require.NoError(t, rows.Err()) + require.Equal(t, exp, act) + } + + requireStorageState := func(t *testing.T, ctx context.Context, exp storageState) { + rows, err := db.QueryContext(ctx, ` +SELECT virtual_storage, relative_path, storage, generation +FROM storage_repositories + `) + require.NoError(t, err) + defer rows.Close() + + act := make(storageState) + for rows.Next() { + var vs, rel, storage string + var gen int + require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen)) + + if act[vs] == nil { + act[vs] = make(map[string]map[string]int) + } + if act[vs][rel] == nil { + act[vs][rel] = make(map[string]int) + } + + act[vs][rel][storage] = gen + } + + require.NoError(t, rows.Err()) + require.Equal(t, exp, act) + } + + return gs, func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) { t.Helper() - require.Equal(t, vss, rs.virtualStorageState) - require.Equal(t, ss, rs.storageState) + requireVirtualStorageState(t, ctx, vss) + requireStorageState(t, ctx, ss) } }) } |