Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-12-15 01:01:27 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-12-17 20:37:00 +0300
commit7af9c950110a86775621b3db2af8e23056c0cb4c (patch)
tree496fb2391d5439aa31549274c00354ecdd8cc816 /internal/praefect/datastore
parent5176ccc8189ebced42b358347504977095ab3224 (diff)
Retrieve all consistent storages
On each read/write operation praefect requires to know which gitaly node is a primary. For mutator operations it used to define from what node the response will be returned back to the client. For the read operations it is used to redirect request to or as a fallback option for reads distribution in case it is enabled. The default strategy for defining the primary is 'sql' which means the primary is tracked inside of the Postgres database and praefect issues select statement into it each time it needs to define the current primary. It creates a high load on the database when there are too many read operations (the outcome of the performance testing). To resolve this problem we change the logic of retrieval of the set of up to date storages to return all storages including the primary. With it in place we don't need to know the current primary and use any storage that has latest generation of the repository to serve the requests. As this information is cached by the in-memory cache praefect won't create a high load on the database anymore. This change also makes check IsLatestGeneration for the primary node redundant as it won't be present in the set of consistent storages if its generation not the latest one. Fix linting issues Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3337
Diffstat (limited to 'internal/praefect/datastore')
-rw-r--r--internal/praefect/datastore/repository_store.go34
-rw-r--r--internal/praefect/datastore/repository_store_bm_test.go26
-rw-r--r--internal/praefect/datastore/repository_store_mock.go9
-rw-r--r--internal/praefect/datastore/repository_store_test.go36
-rw-r--r--internal/praefect/datastore/storage_provider.go58
-rw-r--r--internal/praefect/datastore/storage_provider_test.go122
6 files changed, 150 insertions, 135 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 488cbe5cd..bdcc7a198 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -84,9 +84,8 @@ type RepositoryStore interface {
// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
// which has no record in the virtual storage or the storage.
RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- // GetConsistentSecondaries checks which secondaries are on the same generation as the primary and returns them.
- // If the primary's generation is unknown, all secondaries are considered inconsistent.
- GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+ // GetConsistentStorages checks which storages are on the latest generation and returns them.
+ GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
// IsLatestGeneration checks whether the repository is on the latest generation or not. If the repository does not
// have an expected generation, every storage is considered to be on the latest version.
IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error)
@@ -341,10 +340,11 @@ AND storage = $3
return err
}
-func (rs *PostgresRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) {
+// GetConsistentStorages checks which storages are on the latest generation and returns them.
+func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
const q = `
WITH expected_repositories AS (
- SELECT virtual_storage, relative_path, unnest($3::text[]) AS storage, MAX(generation) AS generation
+ SELECT virtual_storage, relative_path, MAX(generation) AS generation
FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
@@ -353,7 +353,7 @@ WITH expected_repositories AS (
SELECT storage
FROM storage_repositories
-JOIN expected_repositories USING (virtual_storage, relative_path, storage, generation)
+JOIN expected_repositories USING (virtual_storage, relative_path, generation)
`
storages, err := rs.storages.storages(virtualStorage)
@@ -361,32 +361,28 @@ JOIN expected_repositories USING (virtual_storage, relative_path, storage, gener
return nil, err
}
- secondaries := make([]string, 0, len(storages)-1)
- for _, storage := range storages {
- if storage == primary {
- continue
- }
-
- secondaries = append(secondaries, storage)
- }
-
- rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath, pq.StringArray(secondaries))
+ rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
consistentSecondaries := make(map[string]struct{}, len(storages)-1)
+
for rows.Next() {
var storage string
if err := rows.Scan(&storage); err != nil {
- return nil, err
+ return nil, fmt.Errorf("scan: %w", err)
}
consistentSecondaries[storage] = struct{}{}
}
- return consistentSecondaries, rows.Err()
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows: %w", err)
+ }
+
+ return consistentSecondaries, nil
}
func (rs *PostgresRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) {
diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go
index 7c8647609..739cf2680 100644
--- a/internal/praefect/datastore/repository_store_bm_test.go
+++ b/internal/praefect/datastore/repository_store_bm_test.go
@@ -11,34 +11,34 @@ import (
)
// The test setup takes a lot of time, so it is better to run each sub-benchmark separately with limit on number of repeats.
-func BenchmarkPostgresRepositoryStore_GetConsistentSecondaries(b *testing.B) {
- // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentSecondaries/extra-small -benchtime=5000x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
+func BenchmarkPostgresRepositoryStore_GetConsistentStorages(b *testing.B) {
+ // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentStorages/extra-small -benchtime=5000x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
b.Run("extra-small", func(b *testing.B) {
- benchmarkGetConsistentSecondaries(b, 3, 1000)
+ benchmarkGetConsistentStorages(b, 3, 1000)
})
- // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentSecondaries/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
+ // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentStorages/small -benchtime=1000x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
b.Run("small", func(b *testing.B) {
- benchmarkGetConsistentSecondaries(b, 3, 10_000)
+ benchmarkGetConsistentStorages(b, 3, 10_000)
})
- // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentSecondaries/medium -benchtime=50x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
+ // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentStorages/medium -benchtime=50x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
b.Run("medium", func(b *testing.B) {
- benchmarkGetConsistentSecondaries(b, 3, 100_000)
+ benchmarkGetConsistentStorages(b, 3, 100_000)
})
- // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentSecondaries/large -benchtime=10x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
+ // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentStorages/large -benchtime=10x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
b.Run("large", func(b *testing.B) {
- benchmarkGetConsistentSecondaries(b, 3, 1_000_000)
+ benchmarkGetConsistentStorages(b, 3, 1_000_000)
})
- // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentSecondaries/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
+ // go test -tags=postgres -test.bench=BenchmarkPostgresRepositoryStore_GetConsistentStorages/huge -benchtime=1x gitlab.com/gitlab-org/gitaly/internal/praefect/datastore
b.Run("huge", func(b *testing.B) {
- benchmarkGetConsistentSecondaries(b, 6, 1_000_000)
+ benchmarkGetConsistentStorages(b, 6, 1_000_000)
})
}
-func benchmarkGetConsistentSecondaries(b *testing.B, nstorages, nrepositories int) {
+func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) {
db := getDB(b)
ctx, cancel := testhelper.Context()
@@ -67,7 +67,7 @@ func benchmarkGetConsistentSecondaries(b *testing.B, nstorages, nrepositories in
require.NoError(b, err)
b.StartTimer()
- _, err = repoStore.GetConsistentSecondaries(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2), "s1")
+ _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2))
b.StopTimer()
require.NoError(b, err)
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index d598a1692..2b943fdaf 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -12,7 +12,7 @@ type MockRepositoryStore struct {
SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+ GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
GetPartiallyReplicatedRepositoriesFunc func(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error)
DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
@@ -74,12 +74,13 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag
return m.RenameRepositoryFunc(ctx, virtualStorage, relativePath, storage, newRelativePath)
}
-func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) {
- if m.GetConsistentSecondariesFunc == nil {
+// GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.
+func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+ if m.GetConsistentStoragesFunc == nil {
return map[string]struct{}{}, nil
}
- return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary)
+ return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath)
}
func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index bd72c4729..33ba4fbb0 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -432,13 +432,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
})
- t.Run("GetConsistentSecondaries", func(t *testing.T) {
+ t.Run("GetConsistentStorages", func(t *testing.T) {
rs, requireState := newStore(t, map[string][]string{
vs: []string{"primary", "consistent-secondary", "inconsistent-secondary", "no-record"},
})
t.Run("unknown generations", func(t *testing.T) {
- secondaries, err := rs.GetConsistentSecondaries(ctx, vs, repo, "primary")
+ secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Empty(t, secondaries)
})
@@ -464,18 +464,44 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
)
t.Run("consistent secondary", func(t *testing.T) {
- secondaries, err := rs.GetConsistentSecondaries(ctx, vs, repo, "primary")
+ secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"consistent-secondary": struct{}{}}, secondaries)
+ require.Equal(t, map[string]struct{}{"primary": struct{}{}, "consistent-secondary": struct{}{}}, secondaries)
})
require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 0))
t.Run("outdated primary", func(t *testing.T) {
- secondaries, err := rs.GetConsistentSecondaries(ctx, vs, repo, "primary")
+ secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"consistent-secondary": struct{}{}}, secondaries)
})
+
+ t.Run("storage with highest generation is not configured", func(t *testing.T) {
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "unknown", 2))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 1))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": struct{}{},
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "unknown": 2,
+ "primary": 1,
+ "consistent-secondary": 1,
+ "inconsistent-secondary": 0,
+ },
+ },
+ },
+ )
+
+ secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ require.NoError(t, err)
+ require.Equal(t, map[string]struct{}{"unknown": struct{}{}}, secondaries)
+ })
})
t.Run("DeleteInvalidRepository", func(t *testing.T) {
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index fe8e46804..c5af19215 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -8,54 +8,41 @@ import (
"sync"
"sync/atomic"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
-// SecondariesProvider should provide information about secondary storages.
-type SecondariesProvider interface {
- // GetConsistentSecondaries returns all secondaries with the same generation as the primary.
- GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error)
+// StoragesProvider should provide information about repository storages.
+type StoragesProvider interface {
+ // GetConsistentStorages returns storages which have the latest generation of the repository.
+ GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
}
// DirectStorageProvider provides the latest state of the synced nodes.
type DirectStorageProvider struct {
- sp SecondariesProvider
+ sp StoragesProvider
}
// NewDirectStorageProvider returns a new storage provider.
-func NewDirectStorageProvider(sp SecondariesProvider) *DirectStorageProvider {
+func NewDirectStorageProvider(sp StoragesProvider) *DirectStorageProvider {
return &DirectStorageProvider{sp: sp}
}
-func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string {
- storages, _ := c.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage)
- return storages
-}
-
-func (c *DirectStorageProvider) getSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) {
- upToDateStorages, err := c.sp.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primaryStorage)
+// GetSyncedNodes returns list of gitaly storages that are in up to date state based on the generation tracking.
+func (c *DirectStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
+ consistentStorages, err := c.sp.GetConsistentStorages(ctx, virtualStorage, relativePath)
if err != nil {
- // this is recoverable error - we can proceed with primary node
- ctxlogrus.Extract(ctx).WithError(err).Warn("get consistent secondaries")
- return []string{primaryStorage}, false
+ return nil, err
}
- return combineStorages(upToDateStorages, primaryStorage), true
-}
-
-func combineStorages(upToDateStorages map[string]struct{}, primaryStorage string) []string {
- storages := make([]string, 0, len(upToDateStorages)+1)
- for upToDateStorage := range upToDateStorages {
- if upToDateStorage != primaryStorage {
- storages = append(storages, upToDateStorage)
- }
+ storages := make([]string, 0, len(consistentStorages))
+ for storage := range consistentStorages {
+ storages = append(storages, storage)
}
- return append(storages, primaryStorage)
+ return storages, nil
}
// errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured.
@@ -77,7 +64,7 @@ type CachingStorageProvider struct {
}
// NewCachingStorageProvider returns a storage provider that uses caching.
-func NewCachingStorageProvider(logger logrus.FieldLogger, sp SecondariesProvider, virtualStorages []string) (*CachingStorageProvider, error) {
+func NewCachingStorageProvider(logger logrus.FieldLogger, sp StoragesProvider, virtualStorages []string) (*CachingStorageProvider, error) {
csp := &CachingStorageProvider{
dsp: NewDirectStorageProvider(sp),
caches: make(map[string]*lru.Cache, len(virtualStorages)),
@@ -166,9 +153,9 @@ func (c *CachingStorageProvider) getCache(virtualStorage string) (*lru.Cache, bo
return val, found
}
-func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage, relativePath, primaryStorage string) ([]string, bool) {
+func (c *CachingStorageProvider) cacheMiss(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc()
- return c.dsp.getSyncedNodes(ctx, virtualStorage, relativePath, primaryStorage)
+ return c.dsp.GetSyncedNodes(ctx, virtualStorage, relativePath)
}
func (c *CachingStorageProvider) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, []string, bool) {
@@ -195,7 +182,8 @@ func (c *CachingStorageProvider) tryCache(virtualStorage, relativePath string) (
func (c *CachingStorageProvider) isCacheEnabled() bool { return atomic.LoadInt32(&c.access) != 0 }
-func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath, primaryStorage string) []string {
+// GetSyncedNodes returns list of gitaly storages that are in up to date state based on the generation tracking.
+func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
var cache *lru.Cache
if c.isCacheEnabled() {
@@ -207,16 +195,16 @@ func (c *CachingStorageProvider) GetSyncedNodes(ctx context.Context, virtualStor
defer populationDone()
if ok {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
- return storages
+ return storages, nil
}
}
- storages, ok := c.cacheMiss(ctx, virtualStorage, relativePath, primaryStorage)
- if ok && cache != nil {
+ storages, err := c.cacheMiss(ctx, virtualStorage, relativePath)
+ if err == nil && cache != nil {
cache.Add(relativePath, storages)
c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc()
}
- return storages
+ return storages, err
}
func getStringSlice(cache *lru.Cache, key string) ([]string, bool) {
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index aa22ba81c..64675b032 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -32,7 +32,7 @@ func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) {
{
desc: "primary included",
ret: map[string]struct{}{"g2": {}, "g3": {}},
- exp: []string{"g1", "g2", "g3"},
+ exp: []string{"g2", "g3"},
},
{
desc: "distinct values",
@@ -42,41 +42,34 @@ func TestDirectStorageProvider_GetSyncedNodes(t *testing.T) {
{
desc: "none",
ret: nil,
- exp: []string{"g1"},
+ exp: []string{},
},
} {
t.Run(tc.desc, func(t *testing.T) {
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1").Return(tc.ret, nil)
+ rs.On("GetConsistentStorages", ctx, "vs", "/repo/path").Return(tc.ret, nil)
sp := NewDirectStorageProvider(rs)
- storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ storages, err := sp.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, tc.exp, storages)
})
}
})
t.Run("repository store returns an error", func(t *testing.T) {
- logger := testhelper.DiscardTestEntry(t)
- logHook := test.NewLocal(logger.Logger)
-
- ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger))
+ ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t)))
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", ctx, "vs", "/repo/path", "g1").
+ rs.On("GetConsistentStorages", ctx, "vs", "/repo/path").
Return(nil, assert.AnError).
Once()
sp := NewDirectStorageProvider(rs)
- storages := sp.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
- require.ElementsMatch(t, []string{"g1"}, storages)
-
- require.Len(t, logHook.AllEntries(), 1)
- require.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
- require.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
- require.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
+ _, err := sp.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.Equal(t, assert.AnError, err)
})
}
@@ -84,8 +77,8 @@ type mockConsistentSecondariesProvider struct {
mock.Mock
}
-func (m *mockConsistentSecondariesProvider) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) {
- args := m.Called(ctx, virtualStorage, relativePath, primary)
+func (m *mockConsistentSecondariesProvider) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+ args := m.Called(ctx, virtualStorage, relativePath)
val := args.Get(0)
var res map[string]struct{}
if val != nil {
@@ -100,8 +93,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "unknown", "/repo/path", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil).
+ rs.On("GetConsistentStorages", mock.Anything, "unknown", "/repo/path").
+ Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
Once()
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
@@ -109,7 +102,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
cache.Connected()
// empty cache should be populated
- storages := cache.GetSyncedNodes(ctx, "unknown", "/repo/path", "g1")
+ storages, err := cache.GetSyncedNodes(ctx, "unknown", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -125,8 +119,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil).
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
+ Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
Once()
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
@@ -134,7 +128,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
cache.Connected()
// empty cache should be populated
- storages := cache.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ storages, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -146,7 +141,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.NoError(t, err)
// populated cache should return cached value
- storages = cache.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ storages, err = cache.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -160,14 +156,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("repository store returns an error", func(t *testing.T) {
- logger := testhelper.DiscardTestEntry(t)
- logHook := test.NewLocal(logger.Logger)
-
- ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger))
+ ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t)))
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path", "g1").
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
Return(nil, assert.AnError).
Once()
@@ -175,13 +168,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.NoError(t, err)
cache.Connected()
- storages := cache.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
- require.ElementsMatch(t, []string{"g1"}, storages)
-
- require.Len(t, logHook.AllEntries(), 1)
- assert.Equal(t, "get consistent secondaries", logHook.LastEntry().Message)
- assert.Equal(t, logrus.Fields{"error": assert.AnError}, logHook.LastEntry().Data)
- assert.Equal(t, logrus.WarnLevel, logHook.LastEntry().Level)
+ _, err = cache.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.Equal(t, assert.AnError, err)
// "populate" metric is not set as there was an error and we don't want this result to be cached
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -200,8 +188,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil).
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").
+ Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
Times(4)
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
@@ -209,7 +197,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
cache.Connected()
// first access populates the cache
- storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ storages1, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
// invalid payload disables caching
@@ -218,15 +207,18 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
expErr := json.Unmarshal([]byte(notification.Payload), new(struct{}))
// second access omits cached data as caching should be disabled
- storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ storages2, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
// third access retrieves data and caches it
- storages3 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ storages3, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3)
// fourth access retrieves data from cache
- storages4 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ storages4, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages4)
require.Len(t, logHook.AllEntries(), 1)
@@ -253,19 +245,21 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil)
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/2", "g1").
- Return(map[string]struct{}{"g2": {}}, nil)
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").
+ Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil)
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2").
+ Return(map[string]struct{}{"g1": {}, "g2": {}}, nil)
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// first access populates the cache
- path1Storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ path1Storages1, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages1)
- path2Storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/2", "g1")
+ path2Storages1, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/2")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages1)
// notification evicts entries for '/repo/path/2' from the cache
@@ -277,10 +271,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
)
// second access re-uses cached data for '/repo/path/1'
- path1Storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1")
+ path1Storages2, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages2)
// second access populates the cache again for '/repo/path/2'
- path2Storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path/2", "g1")
+ path2Storages2, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/2")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages2)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -299,22 +295,24 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path", "g1").
- Return(map[string]struct{}{"g2": {}, "g3": {}}, nil)
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
+ Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil)
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// first access populates the cache
- storages1 := cache.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ storages1, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1)
// disconnection disables cache
cache.Disconnect(assert.AnError)
// second access retrieve data and doesn't populate the cache
- storages2 := cache.GetSyncedNodes(ctx, "vs", "/repo/path", "g1")
+ storages2, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path")
+ require.NoError(t, err)
require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -332,8 +330,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
defer cancel()
rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/1", "g1").Return(nil, nil)
- rs.On("GetConsistentSecondaries", mock.Anything, "vs", "/repo/path/2", "g1").Return(nil, nil)
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").Return(nil, nil)
+ rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2").Return(nil, nil)
cache, err := NewCachingStorageProvider(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -347,9 +345,15 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
var f func()
switch i % 6 {
case 0, 1:
- f = func() { cache.GetSyncedNodes(ctx, "vs", "/repo/path/1", "g1") }
+ f = func() {
+ _, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/1")
+ assert.NoError(t, err)
+ }
case 2, 3:
- f = func() { cache.GetSyncedNodes(ctx, "vs", "/repo/path/2", "g1") }
+ f = func() {
+ _, err := cache.GetSyncedNodes(ctx, "vs", "/repo/path/2")
+ assert.NoError(t, err)
+ }
case 4:
f = func() { cache.Notification(nf1) }
case 5: