diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-10-25 13:46:09 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-10-25 13:46:09 +0300 |
commit | 6809578bbadfc2fe2d4aaa0bcfa3ac042efdd6b8 (patch) | |
tree | 56ee2a4081f3832da928816fa36b60b7ad98823f | |
parent | 06a084ee44991c5ced232902a8e4d8caa9422d14 (diff) | |
parent | bd454b4eb6329d817fb76845650288a499b42197 (diff) |
Merge branch 'smh-replica-path-cache' into 'master'
Rewrite relative paths in Coordinator
See merge request gitlab-org/gitaly!3974
-rw-r--r-- | internal/praefect/coordinator.go | 15 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 26 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 56 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_bm_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 12 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 30 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider.go | 38 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider_test.go | 115 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/router.go | 12 | ||||
-rw-r--r-- | internal/praefect/router_node_manager.go | 14 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 40 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 45 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 41 |
16 files changed, 272 insertions, 190 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index d31e40fbe..98029fb41 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -398,23 +398,23 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal repoPath := call.targetRepo.GetRelativePath() virtualStorage := call.targetRepo.StorageName - node, err := c.router.RouteRepositoryAccessor( + route, err := c.router.RouteRepositoryAccessor( ctx, virtualStorage, repoPath, shouldRouteRepositoryAccessorToPrimary(ctx, call), ) if err != nil { return nil, fmt.Errorf("accessor call: route repository accessor: %w", err) } - b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, node.Storage) + b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Node.Storage, route.ReplicaPath) if err != nil { return nil, fmt.Errorf("accessor call: rewrite storage: %w", err) } - metrics.ReadDistribution.WithLabelValues(virtualStorage, node.Storage).Inc() + metrics.ReadDistribution.WithLabelValues(virtualStorage, route.Node.Storage).Inc() return proxy.NewStreamParameters(proxy.Destination{ Ctx: streamParametersContext(ctx), - Conn: node.Connection, + Conn: route.Node.Connection, Msg: b, }, nil, nil, nil), nil } @@ -486,7 +486,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } } - primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage) + primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath) if err != nil { return nil, fmt.Errorf("mutator call: rewrite storage: %w", err) } @@ -528,7 +528,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall for _, secondary := range route.Secondaries { secondary := secondary - secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage) + secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath) if err != nil { return nil, err } @@ -783,7 +783,7 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil } -func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage string) ([]byte, error) { +func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage, relativePath string) ([]byte, error) { targetRepo, err := mi.TargetRepo(m) if err != nil { return nil, helper.ErrInvalidArgument(err) @@ -791,6 +791,7 @@ func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, st // rewrite storage name targetRepo.StorageName = storage + targetRepo.RelativePath = relativePath additionalRepo, ok, err := mi.AdditionalRepo(m) if err != nil { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 0b988bfa2..b9e4230d2 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -99,11 +99,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { defer cancel() rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(context.Context, string, string) (map[string]struct{}, error) { + GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) { if tc.readOnly { - return map[string]struct{}{storage + "-other": {}}, nil + return "", map[string]struct{}{storage + "-other": {}}, nil } - return map[string]struct{}{storage: {}}, nil + return "", map[string]struct{}{storage: {}}, nil }, } @@ -326,8 +326,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { } rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - return map[string]struct{}{"primary": {}, "secondary": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil }, } @@ -400,10 +400,10 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { type mockRouter struct { Router - routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) + routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) } -func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) { +func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary) } @@ -459,8 +459,8 @@ func TestStreamDirectorAccessor(t *testing.T) { { desc: "repository not found", router: mockRouter{ - routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RouterNode, error) { - return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RepositoryAccessorRoute, error) { + return RepositoryAccessorRoute{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) }, }, error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))), @@ -547,8 +547,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { entry := testhelper.DiscardTestEntry(t) repoStore := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - return map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil }, } @@ -1566,8 +1566,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. withRepoStore: datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil }, }, }) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index e4753008f..0d2a10602 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -115,8 +115,8 @@ type RepositoryStore interface { // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository // which has no record in the virtual storage or the storage. RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - // GetConsistentStoragesByRepositoryID returns a set of up to date storages for the given repository keyed by repository ID. - GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error) + // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. + GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -504,60 +504,52 @@ AND storage = $3 return err } -// GetConsistentStoragesByRepositoryID returns a set of up to date storages for the given repository keyed by repository ID. -func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error) { +// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. +func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { return rs.getConsistentStorages(ctx, ` -SELECT storage +SELECT replica_path, ARRAY_AGG(storage) FROM repositories JOIN storage_repositories USING (repository_id, generation) WHERE repository_id = $1 +GROUP BY replica_path `, repositoryID) } -// GetConsistentStorages returns a set of up to date storages for the given repository keyed by virtual storage and relative path. -func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - storages, err := rs.getConsistentStorages(ctx, ` -SELECT storage +// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. +func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + replicaPath, storages, err := rs.getConsistentStorages(ctx, ` +SELECT replica_path, ARRAY_AGG(storage) FROM repositories JOIN storage_repositories USING (repository_id, generation) WHERE repositories.virtual_storage = $1 AND repositories.relative_path = $2 +GROUP BY replica_path `, virtualStorage, relativePath) - if errors.Is(err, commonerr.ErrRepositoryNotFound) { - return nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + return "", nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) } - return storages, err + return replicaPath, storages, err } // getConsistentStorages is a helper for querying the consistent storages by different keys. -func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (map[string]struct{}, error) { - rows, err := rs.db.QueryContext(ctx, query, params...) - if err != nil { - return nil, fmt.Errorf("query: %w", err) - } - defer rows.Close() - - consistentStorages := map[string]struct{}{} - for rows.Next() { - var storage string - if err := rows.Scan(&storage); err != nil { - return nil, fmt.Errorf("scan: %w", err) +func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) { + var replicaPath string + var storages pq.StringArray + if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", nil, commonerr.ErrRepositoryNotFound } - consistentStorages[storage] = struct{}{} - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("rows: %w", err) + return "", nil, fmt.Errorf("query: %w", err) } - if len(consistentStorages) == 0 { - return nil, commonerr.ErrRepositoryNotFound + consistentStorages := make(map[string]struct{}, len(storages)) + for _, storage := range storages { + consistentStorages[storage] = struct{}{} } - return consistentStorages, nil + return replicaPath, consistentStorages, nil } func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) { diff --git a/internal/praefect/datastore/repository_store_bm_test.go b/internal/praefect/datastore/repository_store_bm_test.go index f39750114..8555c971b 100644 --- a/internal/praefect/datastore/repository_store_bm_test.go +++ b/internal/praefect/datastore/repository_store_bm_test.go @@ -66,7 +66,7 @@ func benchmarkGetConsistentStorages(b *testing.B, nstorages, nrepositories int) require.NoError(b, err) b.StartTimer() - _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) + _, _, err = repoStore.GetConsistentStorages(ctx, "vs", "/path/repo/"+strconv.Itoa(nrepositories/2)) b.StopTimer() require.NoError(b, err) diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 43717174a..cb10c3f69 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -14,8 +14,8 @@ type MockRepositoryStore struct { DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (map[string]struct{}, error) - GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) + GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) + GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error) DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -99,18 +99,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag } // GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { if m.GetConsistentStoragesFunc == nil { - return map[string]struct{}{}, nil + return "", map[string]struct{}{}, nil } return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID) } // GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { if m.GetConsistentStoragesFunc == nil { - return map[string]struct{}{}, nil + return "", map[string]struct{}{}, nil } return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index c691b8821..a004ba88d 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -921,12 +921,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("no records", func(t *testing.T) { - secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) + require.Empty(t, replicaPath) require.Empty(t, secondaries) - secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) + require.Empty(t, replicaPath) require.Empty(t, secondaries) }) @@ -951,25 +953,29 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { ) t.Run("consistent secondary", func(t *testing.T) { - secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) + require.Equal(t, repo, replicaPath) - secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) + require.Equal(t, repo, replicaPath) }) require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 0)) t.Run("outdated primary", func(t *testing.T) { - secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) + require.Equal(t, repo, replicaPath) - secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) + require.Equal(t, repo, replicaPath) }) t.Run("storage with highest generation is not configured", func(t *testing.T) { @@ -993,13 +999,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) - secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) + require.Equal(t, repo, replicaPath) - secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) + require.Equal(t, repo, replicaPath) }) t.Run("returns not found for deleted repositories", func(t *testing.T) { @@ -1007,13 +1015,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) requireState(t, ctx, virtualStorageState{}, storageState{}) - secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) + replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Empty(t, secondaries) + require.Empty(t, replicaPath) - secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) + replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.Equal(t, commonerr.ErrRepositoryNotFound, err) require.Empty(t, secondaries) + require.Empty(t, replicaPath) }) }) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index 376c480eb..654209ac1 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -16,9 +16,8 @@ import ( // ConsistentStoragesGetter returns storages which contain the latest generation of a repository. type ConsistentStoragesGetter interface { - // GetConsistentStorages checks which storages are on the latest generation and returns them. Returns a - // commonerr.RepositoryNotFoundError if the repository does not exist. - GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) + // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. + GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) } // errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured. @@ -134,17 +133,17 @@ func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru. return val, found } -func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { +func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) } -func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, map[string]struct{}, bool) { +func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache, cacheValue, bool) { populateDone := func() {} // should be called AFTER any cache population is done cache, found := c.getCache(virtualStorage) if !found { - return populateDone, nil, nil, false + return populateDone, nil, cacheValue{}, false } if storages, found := getKey(cache, relativePath); found { @@ -158,41 +157,46 @@ func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath return populateDone, cache, storages, true } - return populateDone, cache, nil, false + return populateDone, cache, cacheValue{}, false } func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool { return atomic.LoadInt32(&c.access) != 0 } -// GetConsistentStorages returns list of gitaly storages that are in up to date state based on the generation tracking. -func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { +// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. +func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { var cache *lru.Cache if c.isCacheEnabled() { - var storages map[string]struct{} + var value cacheValue var ok bool var populationDone func() - populationDone, cache, storages, ok = c.tryCache(virtualStorage, relativePath) + populationDone, cache, value, ok = c.tryCache(virtualStorage, relativePath) defer populationDone() if ok { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() - return storages, nil + return value.replicaPath, value.storages, nil } } - storages, err := c.cacheMiss(ctx, virtualStorage, relativePath) + replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath) if err == nil && cache != nil { - cache.Add(relativePath, storages) + cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages}) c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc() } - return storages, err + return replicaPath, storages, err } -func getKey(cache *lru.Cache, key string) (map[string]struct{}, bool) { +type cacheValue struct { + replicaPath string + storages map[string]struct{} +} + +func getKey(cache *lru.Cache, key string) (cacheValue, bool) { val, found := cache.Get(key) - vals, _ := val.(map[string]struct{}) + vals, _ := val.(cacheValue) return vals, found } diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 4c9be5e2a..fc214df1e 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -1,7 +1,6 @@ package datastore import ( - "context" "encoding/json" "runtime" "strings" @@ -13,44 +12,33 @@ import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" ) -type mockConsistentSecondariesProvider struct { - mock.Mock -} +func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { + t.Parallel() -func (m *mockConsistentSecondariesProvider) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - args := m.Called(ctx, virtualStorage, relativePath) - val := args.Get(0) - var res map[string]struct{} - if val != nil { - res = val.(map[string]struct{}) - } - return res, args.Error(1) -} + db := glsql.NewDB(t) + rs := NewPostgresRepositoryStore(db, nil) -func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { t.Run("unknown virtual storage", func(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "unknown", "/repo/path"). - Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil). - Once() + require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() // empty cache should be populated - storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") + replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.Equal(t, "/repo/path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) @@ -61,22 +49,22 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("miss -> populate -> hit", func(t *testing.T) { + db.TruncateAll(t) + ctx, cancel := testhelper.Context() defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path"). - Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil). - Once() + require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() // empty cache should be populated - storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.Equal(t, "/repo/path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) @@ -87,9 +75,10 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, err) // populated cache should return cached value - storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.Equal(t, "/repo/path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) @@ -102,20 +91,17 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("repository store returns an error", func(t *testing.T) { + db.TruncateAll(t) + ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(testhelper.DiscardTestEntry(t))) defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path"). - Return(nil, assert.AnError). - Once() - cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() - _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") - require.Equal(t, assert.AnError, err) + _, _, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") + require.Equal(t, commonerr.NewRepositoryNotFoundError("vs", "/repo/path"), err) // "populate" metric is not set as there was an error and we don't want this result to be cached err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -127,25 +113,25 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache is disabled after handling invalid payload", func(t *testing.T) { + db.TruncateAll(t) + logger := testhelper.DiscardTestEntry(t) logHook := test.NewLocal(logger.Logger) ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger)) defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1"). - Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil). - Times(4) + require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() // first access populates the cache - storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) + require.Equal(t, "/repo/path/1", replicaPath) // invalid payload disables caching notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`} @@ -153,19 +139,22 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { expErr := json.Unmarshal([]byte(notification.Payload), new(struct{})) // second access omits cached data as caching should be disabled - storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) + require.Equal(t, "/repo/path/1", replicaPath) // third access retrieves data and caches it - storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3) + require.Equal(t, "/repo/path/1", replicaPath) // fourth access retrieves data from cache - storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4) + require.Equal(t, "/repo/path/1", replicaPath) require.Len(t, logHook.AllEntries(), 1) assert.Equal(t, "received payload can't be processed, cache disabled", logHook.LastEntry().Message) @@ -187,26 +176,27 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("cache invalidation evicts cached entries", func(t *testing.T) { + db.TruncateAll(t) + ctx, cancel := testhelper.Context() defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1"). - Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil) - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2"). - Return(map[string]struct{}{"g1": {}, "g2": {}}, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() // first access populates the cache - path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1) - path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + require.Equal(t, "/repo/path/1", replicaPath) + replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1) + require.Equal(t, "/repo/path/2", replicaPath) // notification evicts entries for '/repo/path/2' from the cache cache.Notification(glsql.Notification{Payload: ` @@ -217,13 +207,15 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { ) // second access re-uses cached data for '/repo/path/1' - path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2) + require.Equal(t, "/repo/path/1", replicaPath1) // second access populates the cache again for '/repo/path/2' - path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2) + require.Equal(t, "/repo/path/2", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) @@ -237,29 +229,31 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("disconnect event disables cache", func(t *testing.T) { + db.TruncateAll(t) + ctx, cancel := testhelper.Context() defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path"). - Return(map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) cache.Connected() // first access populates the cache - storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) + require.Equal(t, "/repo/path", replicaPath) // disconnection disables cache cache.Disconnect(assert.AnError) // second access retrieve data and doesn't populate the cache - storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") + replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) + require.Equal(t, "/repo/path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` # HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage) @@ -272,12 +266,13 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }) t.Run("concurrent access", func(t *testing.T) { + db.TruncateAll(t) + ctx, cancel := testhelper.Context() defer cancel() - rs := &mockConsistentSecondariesProvider{} - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/1").Return(nil, nil) - rs.On("GetConsistentStorages", mock.Anything, "vs", "/repo/path/2").Return(nil, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", nil, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", nil, nil, true, false)) cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"}) require.NoError(t, err) @@ -292,12 +287,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { switch i % 6 { case 0, 1: f = func() { - _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") + _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") assert.NoError(t, err) } case 2, 3: f = func() { - _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") + _, _, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") assert.NoError(t, err) } case 4: diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index fc7dc37ca..d160bfca7 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -257,7 +257,7 @@ func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage string, _ int64) (s } func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) { - upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) + _, upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return nil, err } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index d55d4493d..e53518a14 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -333,8 +333,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { conf.Failover.Enabled = failover rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - return consistentStorages, consistentSecondariesErr + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, consistentStorages, consistentSecondariesErr }, } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b876c109a..fdc2e4685 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -324,7 +324,13 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + repositoryRelativePath := "/path/to/repo" + + rs := datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return repositoryRelativePath, nil, nil + }, + } coordinator := NewCoordinator( queue, @@ -352,8 +358,6 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { defer listener.Close() defer cc.Close() - repositoryRelativePath := "/path/to/repo" - repository := &gitalypb.Repository{ StorageName: conf.VirtualStorages[0].Name, RelativePath: repositoryRelativePath, diff --git a/internal/praefect/router.go b/internal/praefect/router.go index fc99b4769..f4af466a4 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -6,6 +6,14 @@ import ( "google.golang.org/grpc" ) +// RepositoryAccessorRoute describes how to route a repository scoped accessor call. +type RepositoryAccessorRoute struct { + // ReplicaPath is the disk path where the replicas are stored. + ReplicaPath string + // Node contains the details of the node that should handle the request. + Node RouterNode +} + // RouterNode is a subset of a node's configuration needed to perform // request routing. type RouterNode struct { @@ -27,6 +35,8 @@ type StorageMutatorRoute struct { type RepositoryMutatorRoute struct { // RepositoryID is the repository's ID as Praefect identifies it. RepositoryID int64 + // ReplicaPath is the disk path where the replicas are stored. + ReplicaPath string // Primary is the primary node of the transaction. Primary RouterNode // Secondaries are the secondary participating in a transaction. @@ -45,7 +55,7 @@ type Router interface { RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) // RouteRepositoryAccessor returns the node that should serve the repository accessor // request. If forcePrimary is set to `true`, it returns the primary node. - RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) + RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) // RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request. // Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used // with existing repositories. diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 4395d4dfc..18f3083b8 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -35,22 +35,22 @@ func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Route return &nodeManagerRouter{mgr: mgr, rs: rs} } -func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) { +func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { if forcePrimary { shard, err := r.mgr.GetShard(ctx, virtualStorage) if err != nil { - return RouterNode{}, fmt.Errorf("get shard: %w", err) + return RepositoryAccessorRoute{}, fmt.Errorf("get shard: %w", err) } - return toRouterNode(shard.Primary), nil + return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(shard.Primary)}, nil } node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) if err != nil { - return RouterNode{}, fmt.Errorf("get synced node: %w", err) + return RepositoryAccessorRoute{}, fmt.Errorf("get synced node: %w", err) } - return toRouterNode(node), nil + return RepositoryAccessorRoute{ReplicaPath: relativePath, Node: toRouterNode(node)}, nil } func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { @@ -80,7 +80,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) } - consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) + replicaPath, consistentStorages, err := r.rs.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil && !errors.As(err, new(commonerr.RepositoryNotFoundError)) { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } @@ -111,6 +111,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS } return RepositoryMutatorRoute{ + ReplicaPath: replicaPath, Primary: toRouterNode(shard.Primary), Secondaries: toRouterNodes(participatingSecondaries), ReplicationTargets: replicationTargets, @@ -139,6 +140,7 @@ func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtual return RepositoryMutatorRoute{ Primary: toRouterNode(shard.Primary), + ReplicaPath: relativePath, Secondaries: secondaries, ReplicationTargets: replicationTargets, }, nil diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 941b41426..84c0097a2 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -130,35 +130,43 @@ func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualSt return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter") } -func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) { +func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { - return RouterNode{}, err + return RepositoryAccessorRoute{}, err } if forcePrimary { repositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, relativePath) if err != nil { - return RouterNode{}, fmt.Errorf("get repository id: %w", err) + return RepositoryAccessorRoute{}, fmt.Errorf("get repository id: %w", err) } primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID) if err != nil { - return RouterNode{}, fmt.Errorf("get primary: %w", err) + return RepositoryAccessorRoute{}, fmt.Errorf("get primary: %w", err) + } + + replicaPath, _, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + if err != nil { + return RepositoryAccessorRoute{}, fmt.Errorf("get replica path: %w", err) } for _, node := range healthyNodes { if node.Storage == primary { - return node, nil + return RepositoryAccessorRoute{ + ReplicaPath: replicaPath, + Node: node, + }, nil } } - return RouterNode{}, nodes.ErrPrimaryNotHealthy + return RepositoryAccessorRoute{}, nodes.ErrPrimaryNotHealthy } - consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) + replicaPath, consistentStorages, err := r.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) if err != nil { - return RouterNode{}, fmt.Errorf("consistent storages: %w", err) + return RepositoryAccessorRoute{}, fmt.Errorf("consistent storages: %w", err) } healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes)) @@ -170,7 +178,15 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu healthyConsistentNodes = append(healthyConsistentNodes, node) } - return r.pickRandom(healthyConsistentNodes) + node, err := r.pickRandom(healthyConsistentNodes) + if err != nil { + return RepositoryAccessorRoute{}, err + } + + return RepositoryAccessorRoute{ + ReplicaPath: replicaPath, + Node: node, + }, nil } func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { @@ -198,7 +214,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy } - consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) + replicaPath, consistentStorages, err := r.rs.GetConsistentStoragesByRepositoryID(ctx, repositoryID) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } @@ -212,7 +228,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err) } - route := RepositoryMutatorRoute{RepositoryID: repositoryID} + route := RepositoryMutatorRoute{RepositoryID: repositoryID, ReplicaPath: replicaPath} for _, assigned := range assignedStorages { node, healthy := healthySet[assigned] if assigned == primary { @@ -265,6 +281,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu if replicationFactor == 1 { return RepositoryMutatorRoute{ RepositoryID: id, + ReplicaPath: relativePath, Primary: primary, }, nil } @@ -313,6 +330,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{ RepositoryID: id, + ReplicaPath: relativePath, Primary: primary, Secondaries: secondaries, ReplicationTargets: replicationTargets, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index d1f4242fe..9eab31307 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -117,6 +117,8 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { db := glsql.NewDB(t) + const relativePath = "repository" + for _, tc := range []struct { desc string virtualStorage string @@ -218,10 +220,10 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }}) rs := datastore.NewPostgresRepositoryStore(tx, nil) - repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository") + repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath) require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", "repository", "primary", + rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary", []string{"consistent-secondary", "unhealthy-secondary", "inconsistent-secondary"}, nil, true, true), ) require.NoError(t, @@ -244,15 +246,20 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { nil, ) - node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository", tc.forcePrimary) + route, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, relativePath, tc.forcePrimary) require.Equal(t, tc.error, err) if tc.node != "" { - require.Equal(t, RouterNode{ - Storage: tc.node, - Connection: conns[tc.virtualStorage][tc.node], - }, node) + require.Equal(t, + RepositoryAccessorRoute{ + ReplicaPath: relativePath, + Node: RouterNode{ + Storage: tc.node, + Connection: conns[tc.virtualStorage][tc.node], + }, + }, + route) } else { - require.Empty(t, node) + require.Empty(t, route) } }) } @@ -360,12 +367,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredNodes}) + const relativePath = "repository" + rs := datastore.NewPostgresRepositoryStore(tx, nil) - repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", "repository") + repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath) require.NoError(t, err) require.NoError(t, - rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", "repository", "primary", []string{"secondary-1", "secondary-2"}, nil, true, false), + rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false), ) if len(tc.consistentStorages) > 0 { @@ -395,7 +404,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { nil, ) - route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository") + route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath) require.Equal(t, tc.error, err) if err == nil { var secondaries []RouterNode @@ -408,6 +417,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { require.Equal(t, RepositoryMutatorRoute{ RepositoryID: repositoryID, + ReplicaPath: relativePath, Primary: RouterNode{ Storage: "primary", Connection: conns[tc.virtualStorage]["primary"], @@ -445,6 +455,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { db := glsql.NewDB(t) + const relativePath = "relative-path" + for _, tc := range []struct { desc string virtualStorage string @@ -477,6 +489,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, }, @@ -491,6 +504,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -508,6 +522,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -526,6 +541,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), @@ -541,11 +557,13 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, @@ -562,6 +580,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, + ReplicaPath: relativePath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, @@ -587,7 +606,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, nil) if tc.repositoryExists { require.NoError(t, - rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path", "primary", nil, nil, true, true), + rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true), ) } @@ -614,7 +633,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { nil, rs, map[string]int{"virtual-storage-1": tc.replicationFactor}, - ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path") + ).RouteRepositoryCreation(ctx, tc.virtualStorage, relativePath) if tc.error != nil { require.Equal(t, tc.error, err) return diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 20933e5fe..c635b811f 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "io" + "math/rand" "net" "os" "path/filepath" @@ -552,10 +553,14 @@ func TestRemoveRepository(t *testing.T) { defer cancel() cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ - withQueue: queueInterceptor, - withRepoStore: repoStore, - withNodeMgr: nodeMgr, - withTxMgr: txMgr, + withQueue: queueInterceptor, + withRepoStore: datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, nil, nil + }, + }, + withNodeMgr: nodeMgr, + withTxMgr: txMgr, }) defer cleanup() @@ -630,9 +635,31 @@ func TestRenameRepository(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + tx := glsql.NewDB(t).Begin(t) + defer tx.Rollback(t) + + rs := datastore.NewPostgresRepositoryStore(tx, nil) + require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) + + nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) + require.NoError(t, err) + defer nodeSet.Close() + + testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) + cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, - withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil), + withRepoStore: rs, + withRouter: NewPerRepositoryRouter( + nodeSet.Connections(), + nodes.NewPerRepositoryElector(tx), + StaticHealthChecker(praefectCfg.StorageNames()), + NewLockedRandom(rand.New(rand.NewSource(0))), + rs, + datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + rs, + nil, + ), }) defer cleanup() @@ -821,8 +848,8 @@ func TestProxyWrites(t *testing.T) { _, repo, _ := testcfg.BuildWithRepo(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { - return map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil }, } |