Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-10-25 13:46:09 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-10-25 13:46:09 +0300
commit6809578bbadfc2fe2d4aaa0bcfa3ac042efdd6b8 (patch)
tree56ee2a4081f3832da928816fa36b60b7ad98823f
parent06a084ee44991c5ced232902a8e4d8caa9422d14 (diff)
parentbd454b4eb6329d817fb76845650288a499b42197 (diff)
Merge branch 'smh-replica-path-cache' into 'master'
Rewrite relative paths in Coordinator See merge request gitlab-org/gitaly!3974
-rw-r--r--internal/praefect/coordinator.go15
-rw-r--r--internal/praefect/coordinator_test.go26
-rw-r--r--internal/praefect/datastore/repository_store.go56
-rw-r--r--internal/praefect/datastore/repository_store_bm_test.go2
-rw-r--r--internal/praefect/datastore/repository_store_mock.go12
-rw-r--r--internal/praefect/datastore/repository_store_test.go30
-rw-r--r--internal/praefect/datastore/storage_provider.go38
-rw-r--r--internal/praefect/datastore/storage_provider_test.go115
-rw-r--r--internal/praefect/nodes/manager.go2
-rw-r--r--internal/praefect/nodes/manager_test.go4
-rw-r--r--internal/praefect/replicator_test.go10
-rw-r--r--internal/praefect/router.go12
-rw-r--r--internal/praefect/router_node_manager.go14
-rw-r--r--internal/praefect/router_per_repository.go40
-rw-r--r--internal/praefect/router_per_repository_test.go45
-rw-r--r--internal/praefect/server_test.go41
16 files changed, 272 insertions, 190 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index d31e40fbe..98029fb41 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -398,23 +398,23 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
repoPath := call.targetRepo.GetRelativePath()
virtualStorage := call.targetRepo.StorageName
- node, err := c.router.RouteRepositoryAccessor(
+ route, err := c.router.RouteRepositoryAccessor(
ctx, virtualStorage, repoPath, shouldRouteRepositoryAccessorToPrimary(ctx, call),
)
if err != nil {
return nil, fmt.Errorf("accessor call: route repository accessor: %w", err)
}
- b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, node.Storage)
+ b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Node.Storage, route.ReplicaPath)
if err != nil {
return nil, fmt.Errorf("accessor call: rewrite storage: %w", err)
}
- metrics.ReadDistribution.WithLabelValues(virtualStorage, node.Storage).Inc()
+ metrics.ReadDistribution.WithLabelValues(virtualStorage, route.Node.Storage).Inc()
return proxy.NewStreamParameters(proxy.Destination{
Ctx: streamParametersContext(ctx),
- Conn: node.Connection,
+ Conn: route.Node.Connection,
Msg: b,
}, nil, nil, nil), nil
}
@@ -486,7 +486,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
}
}
- primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage)
+ primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath)
if err != nil {
return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
}
@@ -528,7 +528,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
for _, secondary := range route.Secondaries {
secondary := secondary
- secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage)
+ secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath)
if err != nil {
return nil, err
}
@@ -783,7 +783,7 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro
return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil
}
-func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage string) ([]byte, error) {
+func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage, relativePath string) ([]byte, error) {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
return nil, helper.ErrInvalidArgument(err)
@@ -791,6 +791,7 @@ func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, st
// rewrite storage name
targetRepo.StorageName = storage
+ targetRepo.RelativePath = relativePath
additionalRepo, ok, err := mi.AdditionalRepo(m)
if err != nil {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 0b988bfa2..b9e4230d2 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -99,11 +99,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
defer cancel()
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(context.Context, string, string) (map[string]struct{}, error) {
+ GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) {
if tc.readOnly {
- return map[string]struct{}{storage + "-other": {}}, nil
+ return "", map[string]struct{}{storage + "-other": {}}, nil
}
- return map[string]struct{}{storage: {}}, nil
+ return "", map[string]struct{}{storage: {}}, nil
},
}
@@ -326,8 +326,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
}
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- return map[string]struct{}{"primary": {}, "secondary": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil
},
}
@@ -400,10 +400,10 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
type mockRouter struct {
Router
- routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error)
+ routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
}
-func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) {
+func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary)
}
@@ -459,8 +459,8 @@ func TestStreamDirectorAccessor(t *testing.T) {
{
desc: "repository not found",
router: mockRouter{
- routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RouterNode, error) {
- return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
+ routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RepositoryAccessorRoute, error) {
+ return RepositoryAccessorRoute{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
},
},
error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))),
@@ -547,8 +547,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
repoStore := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- return map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil
},
}
@@ -1566,8 +1566,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
// Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
// nodes will take part in transactions.
withRepoStore: datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
},
},
})
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index e4753008f..0d2a10602 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -115,8 +115,8 @@ type RepositoryStore interface {
// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
// which has no record in the virtual storage or the storage.
RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- // GetConsistentStoragesByRepositoryID returns a set of up to date storages for the given repository keyed by repository ID.
- GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error)
+ // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
+ GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
ConsistentStoragesGetter
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
@@ -504,60 +504,52 @@ AND storage = $3
return err
}
-// GetConsistentStoragesByRepositoryID returns a set of up to date storages for the given repository keyed by repository ID.
-func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error) {
+// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
+func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) {
return rs.getConsistentStorages(ctx, `
-SELECT storage
+SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, generation)
WHERE repository_id = $1
+GROUP BY replica_path
`, repositoryID)
}
-// GetConsistentStorages returns a set of up to date storages for the given repository keyed by virtual storage and relative path.
-func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- storages, err := rs.getConsistentStorages(ctx, `
-SELECT storage
+// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
+func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ replicaPath, storages, err := rs.getConsistentStorages(ctx, `
+SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, generation)
WHERE repositories.virtual_storage = $1
AND repositories.relative_path = $2
+GROUP BY replica_path
`, virtualStorage, relativePath)
-
if errors.Is(err, commonerr.ErrRepositoryNotFound) {
- return nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
+ return "", nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
}
- return storages, err
+ return replicaPath, storages, err
}
// getConsistentStorages is a helper for querying the consistent storages by different keys.
-func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (map[string]struct{}, error) {
- rows, err := rs.db.QueryContext(ctx, query, params...)
- if err != nil {
- return nil, fmt.Errorf("query: %w", err)
- }
- defer rows.Close()
-
- consistentStorages := map[string]struct{}{}
- for rows.Next() {
- var storage string
- if err := rows.Scan(&storage); err != nil {
- return nil, fmt.Errorf("scan: %w", err)
+func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) {
+ var replicaPath string
+ var storages pq.StringArray
+ if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return "", nil, commonerr.ErrRepositoryNotFound
}
- consistentStorages[storage] = struct{}{}
- }
-
- if err := rows.Err(); err != nil {
- return nil, fmt.Errorf("rows: %w", err)
+ return "", nil, fmt.Errorf("query: %w", err)
}
- if len(consistentStorages) == 0 {
- return nil, commonerr.ErrRepositoryNotFound
+ consistentStorages := make(map[string]struct{}, len(storages))
+ for _, storage := range storages {
+ consistentStorages[storage] = struct{}{}
}
- return consistentStorages, nil
+ return replicaPath, consistentStorages, nil
}
func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) {
diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go
index f39750114..8555c971b 100644
--- a/internal/praefect/datastore/repository_store_bm_test.go
+++ b/internal/praefect/datastore/repository_store_bm_test.go
@@ -66,7 +66,7 @@ func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int)
require.NoError(b, err)
b.StartTimer()
- _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2))
+ _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2))
b.StopTimer()
require.NoError(b, err)
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 43717174a..cb10c3f69 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -14,8 +14,8 @@ type MockRepositoryStore struct {
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (map[string]struct{}, error)
- GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
+ GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
+ GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error)
DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error
RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
@@ -99,18 +99,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag
}
// GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map.
-func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error) {
+func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) {
if m.GetConsistentStoragesFunc == nil {
- return map[string]struct{}{}, nil
+ return "", map[string]struct{}{}, nil
}
return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID)
}
// GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.
-func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
if m.GetConsistentStoragesFunc == nil {
- return map[string]struct{}{}, nil
+ return "", map[string]struct{}{}, nil
}
return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath)
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index c691b8821..a004ba88d 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -921,12 +921,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
t.Run("no records", func(t *testing.T) {
- secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err)
+ require.Empty(t, replicaPath)
require.Empty(t, secondaries)
- secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
+ replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.Equal(t, commonerr.ErrRepositoryNotFound, err)
+ require.Empty(t, replicaPath)
require.Empty(t, secondaries)
})
@@ -951,25 +953,29 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
)
t.Run("consistent secondary", func(t *testing.T) {
- secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
- secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
+ replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
})
require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 0))
t.Run("outdated primary", func(t *testing.T) {
- secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
- secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
+ replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
})
t.Run("storage with highest generation is not configured", func(t *testing.T) {
@@ -993,13 +999,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
- secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
- secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
+ replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
+ require.Equal(t, repo, replicaPath)
})
t.Run("returns not found for deleted repositories", func(t *testing.T) {
@@ -1007,13 +1015,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
requireState(t, ctx, virtualStorageState{}, storageState{})
- secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
+ replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err)
require.Empty(t, secondaries)
+ require.Empty(t, replicaPath)
- secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
+ replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.Equal(t, commonerr.ErrRepositoryNotFound, err)
require.Empty(t, secondaries)
+ require.Empty(t, replicaPath)
})
})
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index 376c480eb..654209ac1 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -16,9 +16,8 @@ import (
// ConsistentStoragesGetter returns storages which contain the latest generation of a repository.
type ConsistentStoragesGetter interface {
- // GetConsistentStorages checks which storages are on the latest generation and returns them. Returns a
- // commonerr.RepositoryNotFoundError if the repository does not exist.
- GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
+ // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
+ GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
}
// errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured.
@@ -134,17 +133,17 @@ func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru.
return val, found
}
-func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc()
return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath)
}
-func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, map[string]struct{}, bool) {
+func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, cacheValue, bool) {
populateDone := func() {} // should be called AFTER any cache population is done
cache, found := c.getCache(virtualStorage)
if !found {
- return populateDone, nil, nil, false
+ return populateDone, nil, cacheValue{}, false
}
if storages, found := getKey(cache, relativePath); found {
@@ -158,41 +157,46 @@ func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath
return populateDone, cache, storages, true
}
- return populateDone, cache, nil, false
+ return populateDone, cache, cacheValue{}, false
}
func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool {
return atomic.LoadInt32(&c.access) != 0
}
-// GetConsistentStorages returns list of gitaly storages that are in up to date state based on the generation tracking.
-func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
+func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
var cache *lru.Cache
if c.isCacheEnabled() {
- var storages map[string]struct{}
+ var value cacheValue
var ok bool
var populationDone func()
- populationDone, cache, storages, ok = c.tryCache(virtualStorage, relativePath)
+ populationDone, cache, value, ok = c.tryCache(virtualStorage, relativePath)
defer populationDone()
if ok {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
- return storages, nil
+ return value.replicaPath, value.storages, nil
}
}
- storages, err := c.cacheMiss(ctx, virtualStorage, relativePath)
+ replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath)
if err == nil && cache != nil {
- cache.Add(relativePath, storages)
+ cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages})
c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc()
}
- return storages, err
+ return replicaPath, storages, err
}
-func getKey(cache *lru.Cache, key string) (map[string]struct{}, bool) {
+type cacheValue struct {
+ replicaPath string
+ storages map[string]struct{}
+}
+
+func getKey(cache *lru.Cache, key string) (cacheValue, bool) {
val, found := cache.Get(key)
- vals, _ := val.(map[string]struct{})
+ vals, _ := val.(cacheValue)
return vals, found
}
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 4c9be5e2a..fc214df1e 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -1,7 +1,6 @@
package datastore
import (
- "context"
"encoding/json"
"runtime"
"strings"
@@ -13,44 +12,33 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
)
-type mockConsistentSecondariesProvider struct {
- mock.Mock
-}
+func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
+ t.Parallel()
-func (m *mockConsistentSecondariesProvider) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- args := m.Called(ctx, virtualStorage, relativePath)
- val := args.Get(0)
- var res map[string]struct{}
- if val != nil {
- res = val.(map[string]struct{})
- }
- return res, args.Error(1)
-}
+ db := glsql.NewDB(t)
+ rs := NewPostgresRepositoryStore(db, nil)
-func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
t.Run("unknown virtual storage", func(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "unknown", "/repo/path").
- Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
- Once()
+ require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// empty cache should be populated
- storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path")
+ replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.Equal(t, "/repo/path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -61,22 +49,22 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("miss -> populate -> hit", func(t *testing.T) {
+ db.TruncateAll(t)
+
ctx, cancel := testhelper.Context()
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
- Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
- Once()
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// empty cache should be populated
- storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
+ replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.Equal(t, "/repo/path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -87,9 +75,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
require.NoError(t, err)
// populated cache should return cached value
- storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
+ replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.Equal(t, "/repo/path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -102,20 +91,17 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("repository store returns an error", func(t *testing.T) {
+ db.TruncateAll(t)
+
ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t)))
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
- Return(nil, assert.AnError).
- Once()
-
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
- _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
- require.Equal(t, assert.AnError, err)
+ _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
+ require.Equal(t, commonerr.NewRepositoryNotFoundError("vs", "/repo/path"), err)
// "populate" metric is not set as there was an error and we don't want this result to be cached
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -127,25 +113,25 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("cache is disabled after handling invalid payload", func(t *testing.T) {
+ db.TruncateAll(t)
+
logger := testhelper.DiscardTestEntry(t)
logHook := test.NewLocal(logger.Logger)
ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger))
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").
- Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil).
- Times(4)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// first access populates the cache
- storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
+ require.Equal(t, "/repo/path/1", replicaPath)
// invalid payload disables caching
notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`}
@@ -153,19 +139,22 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
expErr := json.Unmarshal([]byte(notification.Payload), new(struct{}))
// second access omits cached data as caching should be disabled
- storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
+ require.Equal(t, "/repo/path/1", replicaPath)
// third access retrieves data and caches it
- storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3)
+ require.Equal(t, "/repo/path/1", replicaPath)
// fourth access retrieves data from cache
- storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4)
+ require.Equal(t, "/repo/path/1", replicaPath)
require.Len(t, logHook.AllEntries(), 1)
assert.Equal(t, "received payload can't be processed, cache disabled", logHook.LastEntry().Message)
@@ -187,26 +176,27 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("cache invalidation evicts cached entries", func(t *testing.T) {
+ db.TruncateAll(t)
+
ctx, cancel := testhelper.Context()
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").
- Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil)
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2").
- Return(map[string]struct{}{"g1": {}, "g2": {}}, nil)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// first access populates the cache
- path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1)
- path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
+ require.Equal(t, "/repo/path/1", replicaPath)
+ replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1)
+ require.Equal(t, "/repo/path/2", replicaPath)
// notification evicts entries for '/repo/path/2' from the cache
cache.Notification(glsql.Notification{Payload: `
@@ -217,13 +207,15 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
)
// second access re-uses cached data for '/repo/path/1'
- path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2)
+ require.Equal(t, "/repo/path/1", replicaPath1)
// second access populates the cache again for '/repo/path/2'
- path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
+ replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2)
+ require.Equal(t, "/repo/path/2", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -237,29 +229,31 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("disconnect event disables cache", func(t *testing.T) {
+ db.TruncateAll(t)
+
ctx, cancel := testhelper.Context()
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path").
- Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
cache.Connected()
// first access populates the cache
- storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
+ replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
+ require.Equal(t, "/repo/path", replicaPath)
// disconnection disables cache
cache.Disconnect(assert.AnError)
// second access retrieve data and doesn't populate the cache
- storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
+ replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
+ require.Equal(t, "/repo/path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -272,12 +266,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
})
t.Run("concurrent access", func(t *testing.T) {
+ db.TruncateAll(t)
+
ctx, cancel := testhelper.Context()
defer cancel()
- rs := &mockConsistentSecondariesProvider{}
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").Return(nil, nil)
- rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2").Return(nil, nil)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", nil, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -292,12 +287,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
switch i % 6 {
case 0, 1:
f = func() {
- _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
+ _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
assert.NoError(t, err)
}
case 2, 3:
f = func() {
- _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
+ _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
assert.NoError(t, err)
}
case 4:
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index fc7dc37ca..d160bfca7 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -257,7 +257,7 @@ func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage string, _ int64) (s
}
func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) {
- upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath)
+ _, upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath)
if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) {
return nil, err
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index d55d4493d..e53518a14 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -333,8 +333,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) {
conf.Failover.Enabled = failover
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- return consistentStorages, consistentSecondariesErr
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, consistentStorages, consistentSecondariesErr
},
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b876c109a..fdc2e4685 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -324,7 +324,13 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
txMgr := transactions.NewManager(conf)
- rs := datastore.MockRepositoryStore{}
+ repositoryRelativePath := "/path/to/repo"
+
+ rs := datastore.MockRepositoryStore{
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return repositoryRelativePath, nil, nil
+ },
+ }
coordinator := NewCoordinator(
queue,
@@ -352,8 +358,6 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
defer listener.Close()
defer cc.Close()
- repositoryRelativePath := "/path/to/repo"
-
repository := &gitalypb.Repository{
StorageName: conf.VirtualStorages[0].Name,
RelativePath: repositoryRelativePath,
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index fc99b4769..f4af466a4 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -6,6 +6,14 @@ import (
"google.golang.org/grpc"
)
+// RepositoryAccessorRoute describes how to route a repository scoped accessor call.
+type RepositoryAccessorRoute struct {
+ // ReplicaPath is the disk path where the replicas are stored.
+ ReplicaPath string
+ // Node contains the details of the node that should handle the request.
+ Node RouterNode
+}
+
// RouterNode is a subset of a node's configuration needed to perform
// request routing.
type RouterNode struct {
@@ -27,6 +35,8 @@ type StorageMutatorRoute struct {
type RepositoryMutatorRoute struct {
// RepositoryID is the repository's ID as Praefect identifies it.
RepositoryID int64
+ // ReplicaPath is the disk path where the replicas are stored.
+ ReplicaPath string
// Primary is the primary node of the transaction.
Primary RouterNode
// Secondaries are the secondary participating in a transaction.
@@ -45,7 +55,7 @@ type Router interface {
RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
// RouteRepositoryAccessor returns the node that should serve the repository accessor
// request. If forcePrimary is set to `true`, it returns the primary node.
- RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error)
+ RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
// RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
// Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used
// with existing repositories.
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 4395d4dfc..18f3083b8 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -35,22 +35,22 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route
return &nodeManagerRouter{mgr: mgr, rs: rs}
}
-func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) {
+func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
if forcePrimary {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
- return RouterNode{}, fmt.Errorf("get shard: %w", err)
+ return RepositoryAccessorRoute{}, fmt.Errorf("get shard: %w", err)
}
- return toRouterNode(shard.Primary), nil
+ return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(shard.Primary)}, nil
}
node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
if err != nil {
- return RouterNode{}, fmt.Errorf("get synced node: %w", err)
+ return RepositoryAccessorRoute{}, fmt.Errorf("get synced node: %w", err)
}
- return toRouterNode(node), nil
+ return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(node)}, nil
}
func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) {
@@ -80,7 +80,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
}
- consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath)
+ replicaPath, consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath)
if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) {
return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
@@ -111,6 +111,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
}
return RepositoryMutatorRoute{
+ ReplicaPath: replicaPath,
Primary: toRouterNode(shard.Primary),
Secondaries: toRouterNodes(participatingSecondaries),
ReplicationTargets: replicationTargets,
@@ -139,6 +140,7 @@ func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtual
return RepositoryMutatorRoute{
Primary: toRouterNode(shard.Primary),
+ ReplicaPath: relativePath,
Secondaries: secondaries,
ReplicationTargets: replicationTargets,
}, nil
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 941b41426..84c0097a2 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -130,35 +130,43 @@ func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualSt
return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter")
}
-func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) {
+func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
healthyNodes, err := r.healthyNodes(virtualStorage)
if err != nil {
- return RouterNode{}, err
+ return RepositoryAccessorRoute{}, err
}
if forcePrimary {
repositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, relativePath)
if err != nil {
- return RouterNode{}, fmt.Errorf("get repository id: %w", err)
+ return RepositoryAccessorRoute{}, fmt.Errorf("get repository id: %w", err)
}
primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID)
if err != nil {
- return RouterNode{}, fmt.Errorf("get primary: %w", err)
+ return RepositoryAccessorRoute{}, fmt.Errorf("get primary: %w", err)
+ }
+
+ replicaPath, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID)
+ if err != nil {
+ return RepositoryAccessorRoute{}, fmt.Errorf("get replica path: %w", err)
}
for _, node := range healthyNodes {
if node.Storage == primary {
- return node, nil
+ return RepositoryAccessorRoute{
+ ReplicaPath: replicaPath,
+ Node: node,
+ }, nil
}
}
- return RouterNode{}, nodes.ErrPrimaryNotHealthy
+ return RepositoryAccessorRoute{}, nodes.ErrPrimaryNotHealthy
}
- consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath)
+ replicaPath, consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath)
if err != nil {
- return RouterNode{}, fmt.Errorf("consistent storages: %w", err)
+ return RepositoryAccessorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes))
@@ -170,7 +178,15 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu
healthyConsistentNodes = append(healthyConsistentNodes, node)
}
- return r.pickRandom(healthyConsistentNodes)
+ node, err := r.pickRandom(healthyConsistentNodes)
+ if err != nil {
+ return RepositoryAccessorRoute{}, err
+ }
+
+ return RepositoryAccessorRoute{
+ ReplicaPath: replicaPath,
+ Node: node,
+ }, nil
}
func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
@@ -198,7 +214,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy
}
- consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID)
+ replicaPath, consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
@@ -212,7 +228,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err)
}
- route := RepositoryMutatorRoute{RepositoryID: repositoryID}
+ route := RepositoryMutatorRoute{RepositoryID: repositoryID, ReplicaPath: replicaPath}
for _, assigned := range assignedStorages {
node, healthy := healthySet[assigned]
if assigned == primary {
@@ -265,6 +281,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
if replicationFactor == 1 {
return RepositoryMutatorRoute{
RepositoryID: id,
+ ReplicaPath: relativePath,
Primary: primary,
}, nil
}
@@ -313,6 +330,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
return RepositoryMutatorRoute{
RepositoryID: id,
+ ReplicaPath: relativePath,
Primary: primary,
Secondaries: secondaries,
ReplicationTargets: replicationTargets,
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index d1f4242fe..9eab31307 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -117,6 +117,8 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
db := glsql.NewDB(t)
+ const relativePath = "repository"
+
for _, tc := range []struct {
desc string
virtualStorage string
@@ -218,10 +220,10 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
}})
rs := datastore.NewPostgresRepositoryStore(tx, nil)
- repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository")
+ repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath)
require.NoError(t, err)
require.NoError(t,
- rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", "repository", "primary",
+ rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary",
[]string{"consistent-secondary", "unhealthy-secondary", "inconsistent-secondary"}, nil, true, true),
)
require.NoError(t,
@@ -244,15 +246,20 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
nil,
)
- node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository", tc.forcePrimary)
+ route, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, relativePath, tc.forcePrimary)
require.Equal(t, tc.error, err)
if tc.node != "" {
- require.Equal(t, RouterNode{
- Storage: tc.node,
- Connection: conns[tc.virtualStorage][tc.node],
- }, node)
+ require.Equal(t,
+ RepositoryAccessorRoute{
+ ReplicaPath: relativePath,
+ Node: RouterNode{
+ Storage: tc.node,
+ Connection: conns[tc.virtualStorage][tc.node],
+ },
+ },
+ route)
} else {
- require.Empty(t, node)
+ require.Empty(t, route)
}
})
}
@@ -360,12 +367,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredNodes})
+ const relativePath = "repository"
+
rs := datastore.NewPostgresRepositoryStore(tx, nil)
- repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository")
+ repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath)
require.NoError(t, err)
require.NoError(t,
- rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", "repository", "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
+ rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
)
if len(tc.consistentStorages) > 0 {
@@ -395,7 +404,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
nil,
)
- route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository")
+ route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath)
require.Equal(t, tc.error, err)
if err == nil {
var secondaries []RouterNode
@@ -408,6 +417,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
require.Equal(t, RepositoryMutatorRoute{
RepositoryID: repositoryID,
+ ReplicaPath: relativePath,
Primary: RouterNode{
Storage: "primary",
Connection: conns[tc.virtualStorage]["primary"],
@@ -445,6 +455,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
db := glsql.NewDB(t)
+ const relativePath = "relative-path"
+
for _, tc := range []struct {
desc string
virtualStorage string
@@ -477,6 +489,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
ReplicationTargets: []string{"secondary-1", "secondary-2"},
},
@@ -491,6 +504,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
@@ -508,6 +522,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
@@ -526,6 +541,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
},
),
@@ -541,11 +557,13 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
},
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}},
},
@@ -562,6 +580,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
+ ReplicaPath: relativePath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
ReplicationTargets: []string{"secondary-2"},
@@ -587,7 +606,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, nil)
if tc.repositoryExists {
require.NoError(t,
- rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path", "primary", nil, nil, true, true),
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true),
)
}
@@ -614,7 +633,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
nil,
rs,
map[string]int{"virtual-storage-1": tc.replicationFactor},
- ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path")
+ ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath)
if tc.error != nil {
require.Equal(t, tc.error, err)
return
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 20933e5fe..c635b811f 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -5,6 +5,7 @@ import (
"context"
"errors"
"io"
+ "math/rand"
"net"
"os"
"path/filepath"
@@ -552,10 +553,14 @@ func TestRemoveRepository(t *testing.T) {
defer cancel()
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
- withQueue: queueInterceptor,
- withRepoStore: repoStore,
- withNodeMgr: nodeMgr,
- withTxMgr: txMgr,
+ withQueue: queueInterceptor,
+ withRepoStore: datastore.MockRepositoryStore{
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, nil, nil
+ },
+ },
+ withNodeMgr: nodeMgr,
+ withTxMgr: txMgr,
})
defer cleanup()
@@ -630,9 +635,31 @@ func TestRenameRepository(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
+ tx := glsql.NewDB(t).Begin(t)
+ defer tx.Rollback(t)
+
+ rs := datastore.NewPostgresRepositoryStore(tx, nil)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
+
+ nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
+
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
withQueue: evq,
- withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil),
+ withRepoStore: rs,
+ withRouter: NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ nodes.NewPerRepositoryElector(tx),
+ StaticHealthChecker(praefectCfg.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ rs,
+ datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()),
+ rs,
+ nil,
+ ),
})
defer cleanup()
@@ -821,8 +848,8 @@ func TestProxyWrites(t *testing.T) {
_, repo, _ := testcfg.BuildWithRepo(t)
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
- return map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil
},
}