Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-29 13:41:24 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-29 13:41:24 +0300
commitf3778a1784e50c9640086fb3ef9cbf7ba4d80513 (patch)
treecd8f7abbb34b69f36aef5120b929e5ccadea4c77
parentec402d9b7129abc8710e3a6edd51f9c6fbcfe347 (diff)
parentd2e0f4ef11ca392010d96da554e70aab9018a688 (diff)
Merge branch 'smh-outdated-repos-stores' into 'master'
Add GetOutdatedRepositories to repository store See merge request gitlab-org/gitaly!2416
-rw-r--r--internal/praefect/datastore/queue_test.go2
-rw-r--r--internal/praefect/datastore/repository_memory.go30
-rw-r--r--internal/praefect/datastore/repository_memory_test.go69
-rw-r--r--internal/praefect/datastore/repository_postgres.go47
4 files changed, 147 insertions, 1 deletions
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 3ea4de70d..4cfcf2200 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -4,8 +4,8 @@ package datastore
import (
"context"
- "testing"
"sync"
+ "testing"
"time"
"github.com/stretchr/testify/assert"
diff --git a/internal/praefect/datastore/repository_memory.go b/internal/praefect/datastore/repository_memory.go
index 357207302..c6adb34e4 100644
--- a/internal/praefect/datastore/repository_memory.go
+++ b/internal/praefect/datastore/repository_memory.go
@@ -213,6 +213,36 @@ func (m *MemoryRepositoryStore) RepositoryExists(ctx context.Context, virtualSto
return m.getRepositoryGeneration(virtualStorage, relativePath) != GenerationUnknown, nil
}
+func (m *MemoryRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
+ m.m.Lock()
+ defer m.m.Unlock()
+
+ storages, ok := m.storages[virtualStorage]
+ if !ok {
+ return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
+ }
+
+ outdatedRepos := make(map[string]map[string]int)
+ repositories, ok := m.virtualStorageState[virtualStorage]
+ if !ok {
+ return outdatedRepos, nil
+ }
+
+ for relativePath, latestGeneration := range repositories {
+ for _, storage := range storages {
+ if gen := m.getStorageGeneration(virtualStorage, relativePath, storage); gen < latestGeneration {
+ if outdatedRepos[relativePath] == nil {
+ outdatedRepos[relativePath] = make(map[string]int)
+ }
+
+ outdatedRepos[relativePath][storage] = latestGeneration - gen
+ }
+ }
+ }
+
+ return outdatedRepos, nil
+}
+
func (m *MemoryRepositoryStore) getRepositoryGeneration(virtualStorage, relativePath string) int {
gen, ok := m.virtualStorageState[virtualStorage][relativePath]
if !ok {
diff --git a/internal/praefect/datastore/repository_memory_test.go b/internal/praefect/datastore/repository_memory_test.go
index cdbf2cb34..364dd16c0 100644
--- a/internal/praefect/datastore/repository_memory_test.go
+++ b/internal/praefect/datastore/repository_memory_test.go
@@ -507,4 +507,73 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.False(t, exists)
})
+
+ t.Run("GetOutdatedRepositories", func(t *testing.T) {
+ t.Run("unknown virtual storage", func(t *testing.T) {
+ rs, _ := newStore(t, map[string][]string{})
+
+ _, err := rs.GetOutdatedRepositories(ctx, "does not exist")
+ require.EqualError(t, err, `unknown virtual storage: "does not exist"`)
+ })
+
+ type state map[string]map[string]map[string]struct {
+ generation int
+ }
+
+ type expected map[string]map[string]int
+
+ for _, tc := range []struct {
+ desc string
+ state state
+ expected map[string]map[string]int
+ }{
+ {
+ desc: "no records in virtual storage",
+ state: state{"virtual-storage-2": {stor: {"repo-1": {generation: 0}}}},
+ expected: expected{},
+ },
+ {
+ desc: "storages missing records",
+ state: state{vs: {stor: {"repo-1": {generation: 0}}}},
+ expected: expected{"repo-1": {"storage-2": 1, "storage-3": 1}},
+ },
+ {
+ desc: "outdated storages",
+ state: state{vs: {
+ stor: {"repo-1": {generation: 2}},
+ "storage-2": {"repo-1": {generation: 1}},
+ "storage-3": {"repo-1": {generation: 0}},
+ }},
+ expected: expected{"repo-1": {"storage-2": 1, "storage-3": 2}},
+ },
+ {
+ desc: "all up to date",
+ state: state{vs: {
+ stor: {"repo-1": {generation: 3}},
+ "storage-2": {"repo-1": {generation: 3}},
+ "storage-3": {"repo-1": {generation: 3}},
+ }},
+ expected: expected{},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ rs, _ := newStore(t, map[string][]string{vs: {stor, "storage-2", "storage-3"}})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for vs, storages := range tc.state {
+ for storage, repos := range storages {
+ for repo, state := range repos {
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, state.generation))
+ }
+ }
+ }
+
+ outdated, err := rs.GetOutdatedRepositories(ctx, vs)
+ require.NoError(t, err)
+ require.Equal(t, tc.expected, outdated)
+ })
+ }
+ })
}
diff --git a/internal/praefect/datastore/repository_postgres.go b/internal/praefect/datastore/repository_postgres.go
index a7a72728d..8f40b12d8 100644
--- a/internal/praefect/datastore/repository_postgres.go
+++ b/internal/praefect/datastore/repository_postgres.go
@@ -79,6 +79,10 @@ type RepositoryStore interface {
IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
+ // GetOutdatedRepositories finds repositories which are not on the latest generation in the virtual storage. It returns a map
+ // with key structure `relative_path-> storage -> generation`, indicating how many changes a storage is missing for a given
+ // repository.
+ GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
}
// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
@@ -404,3 +408,46 @@ AND relative_path = $2
return exists, nil
}
+
+func (rs *PostgresRepositoryStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
+ // As some storages might be missing records from the table, we do a cross join between the repositories and the
+ // configured storages. If a storage is missing an entry, it is considered fully outdated.
+ const q = `
+SELECT relative_path, storage, expected.generation - COALESCE(actual.generation, -1) AS behind_by
+FROM (
+ SELECT virtual_storage, relative_path, storage, generation
+ FROM repositories
+ CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS storages
+ WHERE virtual_storage = $1
+) AS expected
+LEFT JOIN storage_repositories AS actual USING (virtual_storage, relative_path, storage)
+WHERE COALESCE(actual.generation, -1) < expected.generation
+`
+ storages, ok := rs.storages[virtualStorage]
+ if !ok {
+ return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
+ }
+
+ rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ outdated := make(map[string]map[string]int)
+ for rows.Next() {
+ var storage, relativePath string
+ var difference int
+ if err := rows.Scan(&relativePath, &storage, &difference); err != nil {
+ return nil, err
+ }
+
+ if outdated[relativePath] == nil {
+ outdated[relativePath] = make(map[string]int)
+ }
+
+ outdated[relativePath][storage] = difference
+ }
+
+ return outdated, rows.Err()
+}