diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-29 13:41:24 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-29 13:41:24 +0300 |
commit | f3778a1784e50c9640086fb3ef9cbf7ba4d80513 (patch) | |
tree | cd8f7abbb34b69f36aef5120b929e5ccadea4c77 | |
parent | ec402d9b7129abc8710e3a6edd51f9c6fbcfe347 (diff) | |
parent | d2e0f4ef11ca392010d96da554e70aab9018a688 (diff) |
Merge branch 'smh-outdated-repos-stores' into 'master'
Add GetOutdatedRepositories to repository store
See merge request gitlab-org/gitaly!2416
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_memory.go | 30 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_memory_test.go | 69 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_postgres.go | 47 |
4 files changed, 147 insertions, 1 deletions
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 3ea4de70d..4cfcf2200 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -4,8 +4,8 @@ package datastore import ( "context" - "testing" "sync" + "testing" "time" "github.com/stretchr/testify/assert" diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go index 357207302..c6adb34e4 100644 --- a/internal/praefect/datastore/repository_memory.go +++ b/internal/praefect/datastore/repository_memory.go @@ -213,6 +213,36 @@ func (m *MemoryRepositoryStore) RepositoryExists(ctx context.Context, virtualSto return m.getRepositoryGeneration(virtualStorage, relativePath) != GenerationUnknown, nil } +func (m *MemoryRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { + m.m.Lock() + defer m.m.Unlock() + + storages, ok := m.storages[virtualStorage] + if !ok { + return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) + } + + outdatedRepos := make(map[string]map[string]int) + repositories, ok := m.virtualStorageState[virtualStorage] + if !ok { + return outdatedRepos, nil + } + + for relativePath, latestGeneration := range repositories { + for _, storage := range storages { + if gen := m.getStorageGeneration(virtualStorage, relativePath, storage); gen < latestGeneration { + if outdatedRepos[relativePath] == nil { + outdatedRepos[relativePath] = make(map[string]int) + } + + outdatedRepos[relativePath][storage] = latestGeneration - gen + } + } + } + + return outdatedRepos, nil +} + func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int { gen, ok := m.virtualStorageState[virtualStorage][relativePath] if !ok { diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_memory_test.go index cdbf2cb34..364dd16c0 100644 --- a/internal/praefect/datastore/repository_memory_test.go +++ b/internal/praefect/datastore/repository_memory_test.go @@ -507,4 +507,73 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) }) + + t.Run("GetOutdatedRepositories", func(t *testing.T) { + t.Run("unknown virtual storage", func(t *testing.T) { + rs, _ := newStore(t, map[string][]string{}) + + _, err := rs.GetOutdatedRepositories(ctx, "does not exist") + require.EqualError(t, err, `unknown virtual storage: "does not exist"`) + }) + + type state map[string]map[string]map[string]struct { + generation int + } + + type expected map[string]map[string]int + + for _, tc := range []struct { + desc string + state state + expected map[string]map[string]int + }{ + { + desc: "no records in virtual storage", + state: state{"virtual-storage-2": {stor: {"repo-1": {generation: 0}}}}, + expected: expected{}, + }, + { + desc: "storages missing records", + state: state{vs: {stor: {"repo-1": {generation: 0}}}}, + expected: expected{"repo-1": {"storage-2": 1, "storage-3": 1}}, + }, + { + desc: "outdated storages", + state: state{vs: { + stor: {"repo-1": {generation: 2}}, + "storage-2": {"repo-1": {generation: 1}}, + "storage-3": {"repo-1": {generation: 0}}, + }}, + expected: expected{"repo-1": {"storage-2": 1, "storage-3": 2}}, + }, + { + desc: "all up to date", + state: state{vs: { + stor: {"repo-1": {generation: 3}}, + "storage-2": {"repo-1": {generation: 3}}, + "storage-3": {"repo-1": {generation: 3}}, + }}, + expected: expected{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + rs, _ := newStore(t, map[string][]string{vs: {stor, "storage-2", "storage-3"}}) + + ctx, cancel := testhelper.Context() + defer cancel() + + for vs, storages := range tc.state { + for storage, repos := range storages { + for repo, state := range repos { + require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, state.generation)) + } + } + } + + outdated, err := rs.GetOutdatedRepositories(ctx, vs) + require.NoError(t, err) + require.Equal(t, tc.expected, outdated) + }) + } + }) } diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_postgres.go index a7a72728d..8f40b12d8 100644 --- a/internal/praefect/datastore/repository_postgres.go +++ b/internal/praefect/datastore/repository_postgres.go @@ -79,6 +79,10 @@ type RepositoryStore interface { IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) + // GetOutdatedRepositories finds repositories which are not on the latest generation in the virtual storage. It returns a map + // with key structure `relative_path-> storage -> generation`, indicating how many changes a storage is missing for a given + // repository. + GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -404,3 +408,46 @@ AND relative_path = $2 return exists, nil } + +func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { + // As some storages might be missing records from the table, we do a cross join between the repositories and the + // configured storages. If a storage is missing an entry, it is considered fully outdated. + const q = ` +SELECT relative_path, storage, expected.generation - COALESCE(actual.generation, -1) AS behind_by +FROM ( + SELECT virtual_storage, relative_path, storage, generation + FROM repositories + CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS storages + WHERE virtual_storage = $1 +) AS expected +LEFT JOIN storage_repositories AS actual USING (virtual_storage, relative_path, storage) +WHERE COALESCE(actual.generation, -1) < expected.generation +` + storages, ok := rs.storages[virtualStorage] + if !ok { + return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) + } + + rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages)) + if err != nil { + return nil, err + } + defer rows.Close() + + outdated := make(map[string]map[string]int) + for rows.Next() { + var storage, relativePath string + var difference int + if err := rows.Scan(&relativePath, &storage, &difference); err != nil { + return nil, err + } + + if outdated[relativePath] == nil { + outdated[relativePath] = make(map[string]int) + } + + outdated[relativePath][storage] = difference + } + + return outdated, rows.Err() +} |