Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-11-11 19:37:19 +0300
committerJohn Cai <jcai@gitlab.com>2021-11-11 19:37:19 +0300
commit1c25541fb9d55a330b83c5b01ed996eb6021459c (patch)
tree6141fef6ab332c1323f5186792a5a6391af15ec8
parent189048edd92d870f8d0f2e8ecd7ace4b6620539d (diff)
parenta39bf19d601f117cea0261b7e420e8ef598bef49 (diff)
Merge branch 'smh-additional-repository-replica-path' into 'master'
Rewrite additional repository relative path See merge request gitlab-org/gitaly!4053
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_set_replication_factor_test.go2
-rw-r--r--cmd/praefect/subcmd_track_repository.go1
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go2
-rw-r--r--internal/praefect/coordinator.go65
-rw-r--r--internal/praefect/coordinator_pg_test.go2
-rw-r--r--internal/praefect/coordinator_test.go38
-rw-r--r--internal/praefect/datastore/assignment_test.go2
-rw-r--r--internal/praefect/datastore/collector_test.go2
-rw-r--r--internal/praefect/datastore/repository_store.go7
-rw-r--r--internal/praefect/datastore/repository_store_mock.go6
-rw-r--r--internal/praefect/datastore/repository_store_test.go150
-rw-r--r--internal/praefect/datastore/storage_cleanup_test.go10
-rw-r--r--internal/praefect/datastore/storage_provider_test.go42
-rw-r--r--internal/praefect/info_service_test.go2
-rw-r--r--internal/praefect/nodes/per_repository_test.go2
-rw-r--r--internal/praefect/reconciler/reconciler_test.go4
-rw-r--r--internal/praefect/remove_repository_test.go2
-rw-r--r--internal/praefect/replicator_pg_test.go4
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/repocleaner/repository_test.go4
-rw-r--r--internal/praefect/repository_exists_test.go2
-rw-r--r--internal/praefect/router.go5
-rw-r--r--internal/praefect/router_node_manager.go11
-rw-r--r--internal/praefect/router_per_repository.go21
-rw-r--r--internal/praefect/router_per_repository_test.go60
-rw-r--r--internal/praefect/server_test.go2
27 files changed, 279 insertions, 179 deletions
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index 7683aae12..ef80069b7 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -47,7 +47,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
if !repoCreated {
repoCreated = true
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, repo, storage, nil, nil, false, false))
}
require.NoError(t, rs.SetGeneration(ctx, 1, storage, repo, generation))
diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go
index 197045a55..67b917d9d 100644
--- a/cmd/praefect/subcmd_set_replication_factor_test.go
+++ b/cmd/praefect/subcmd_set_replication_factor_test.go
@@ -92,7 +92,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) {
// create a repository record
require.NoError(t,
- datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false),
+ datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "relative-path", "primary", nil, nil, false, false),
)
ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index 17b0d4781..335b353a1 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -174,6 +174,7 @@ func (cmd *trackRepository) trackRepository(
repositoryID,
cmd.virtualStorage,
cmd.relativePath,
+ cmd.relativePath,
primary,
nil,
secondaries,
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 41af66343..609f798dd 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -219,7 +219,7 @@ func TestAddRepository_Exec(t *testing.T) {
ds := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
id, err := ds.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
require.NoError(t, err)
- require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
+ require.NoError(t, ds.CreateRepository(ctx, id, virtualStorageName, relativePath, relativePath, g1Cfg.Storages[0].Name, nil, nil, true, true))
cmd := &trackRepository{
logger: testhelper.NewTestLogger(t),
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 5fb87808f..1f531d449 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -335,20 +335,30 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
}
if replicationType != datastore.CreateRepo {
- // This is a hack for the tests: during execution of the gitaly tests under praefect proxy
- // the repositories are created directly on the filesystem. There is no call for the
- // CreateRepository that creates records in the database that is why we do it artificially
- // before redirecting the calls.
- id, err := c.rs.ReserveRepositoryID(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath)
- if err != nil {
- if !errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
- return nil, err
- }
- } else {
- if err := c.rs.CreateRepository(ctx, id, call.targetRepo.StorageName, call.targetRepo.RelativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil {
- if !errors.As(err, &datastore.RepositoryExistsError{}) {
+ relativePaths := []string{call.targetRepo.RelativePath}
+
+ if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil {
+ return nil, err
+ } else if ok {
+ relativePaths = append(relativePaths, additionalRepo.RelativePath)
+ }
+
+ for _, relativePath := range relativePaths {
+ // This is a hack for the tests: during execution of the gitaly tests under praefect proxy
+ // the repositories are created directly on the filesystem. There is no call for the
+ // CreateRepository that creates records in the database that is why we do it artificially
+ // before redirecting the calls.
+ id, err := c.rs.ReserveRepositoryID(ctx, call.targetRepo.StorageName, relativePath)
+ if err != nil {
+ if !errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
return nil, err
}
+ } else {
+ if err := c.rs.CreateRepository(ctx, id, call.targetRepo.StorageName, relativePath, relativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil {
+ if !errors.As(err, &datastore.RepositoryExistsError{}) {
+ return nil, err
+ }
+ }
}
}
}
@@ -404,7 +414,7 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
return nil, fmt.Errorf("accessor call: route repository accessor: %w", err)
}
- b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Node.Storage, route.ReplicaPath)
+ b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Node.Storage, route.ReplicaPath, "")
if err != nil {
return nil, fmt.Errorf("accessor call: rewrite storage: %w", err)
}
@@ -475,7 +485,14 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, fmt.Errorf("route repository creation: %w", err)
}
default:
- route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath)
+ var additionalRepoRelativePath string
+ if additionalRepo, ok, err := call.methodInfo.AdditionalRepo(call.msg); err != nil {
+ return nil, helper.ErrInvalidArgument(err)
+ } else if ok {
+ additionalRepoRelativePath = additionalRepo.GetRelativePath()
+ }
+
+ route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath, additionalRepoRelativePath)
if err != nil {
if errors.Is(err, ErrRepositoryReadOnly) {
return nil, err
@@ -485,7 +502,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
}
}
- primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath)
+ primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
if err != nil {
return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
}
@@ -527,7 +544,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
for _, secondary := range route.Secondaries {
secondary := secondary
- secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath)
+ secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage, route.ReplicaPath, route.AdditionalReplicaPath)
if err != nil {
return nil, err
}
@@ -782,13 +799,16 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro
return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil
}
-func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage, relativePath string) ([]byte, error) {
+// rewrittenRepositoryMessage rewrites the repository storages and relative paths.
+func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage, relativePath, additionalRelativePath string) ([]byte, error) {
+ // clone the message so the original is not changed
+ m = proto.Clone(m)
targetRepo, err := mi.TargetRepo(m)
if err != nil {
return nil, helper.ErrInvalidArgument(err)
}
- // rewrite storage name
+ // rewrite the target repository
targetRepo.StorageName = storage
targetRepo.RelativePath = relativePath
@@ -799,14 +819,10 @@ func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, st
if ok {
additionalRepo.StorageName = storage
+ additionalRepo.RelativePath = additionalRelativePath
}
- b, err := proxy.NewCodec().Marshal(m)
- if err != nil {
- return nil, err
- }
-
- return b, nil
+ return proxy.NewCodec().Marshal(m)
}
func rewrittenStorageMessage(mi protoregistry.MethodInfo, m proto.Message, storage string) ([]byte, error) {
@@ -1039,6 +1055,7 @@ func (c *Coordinator) newRequestFinalizer(
repositoryID,
virtualStorage,
targetRepo.GetRelativePath(),
+ targetRepo.GetRelativePath(),
primary,
updatedSecondaries,
outdatedSecondaries,
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 1a2678785..3eb70b8d0 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -208,7 +208,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
if !repoCreated {
repoCreated = true
- require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false))
}
require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, repo.RelativePath, n.generation))
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 6dde3bbdd..f6fb53788 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -195,7 +195,7 @@ func TestStreamDirectorMutator(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
if tc.repositoryExists {
- require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
+ require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
}
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
@@ -788,6 +788,37 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
})
}
+func TestRewrittenRepositoryMessage(t *testing.T) {
+ buildRequest := func(storageName, relativePath, additionalRelativePath string) *gitalypb.CreateObjectPoolRequest {
+ return &gitalypb.CreateObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ },
+ },
+ Origin: &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: additionalRelativePath,
+ },
+ }
+ }
+
+ originalRequest := buildRequest("original-storage", "original-relative-path", "original-additional-relative-path")
+
+ methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool")
+ require.NoError(t, err)
+
+ rewrittenMessageBytes, err := rewrittenRepositoryMessage(methodInfo, originalRequest, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path")
+ require.NoError(t, err)
+
+ var rewrittenMessage gitalypb.CreateObjectPoolRequest
+ require.NoError(t, proto.Unmarshal(rewrittenMessageBytes, &rewrittenMessage))
+
+ testassert.ProtoEqual(t, buildRequest("original-storage", "original-relative-path", "original-additional-relative-path"), originalRequest)
+ testassert.ProtoEqual(t, buildRequest("rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path"), &rewrittenMessage)
+}
+
func TestStreamDirector_repo_creation(t *testing.T) {
t.Parallel()
@@ -845,11 +876,12 @@ func TestStreamDirector_repo_creation(t *testing.T) {
var createRepositoryCalled int64
rs := datastore.MockRepositoryStore{
- CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
+ CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
atomic.AddInt64(&createRepositoryCalled, 1)
assert.Equal(t, int64(0), repoID)
assert.Equal(t, targetRepo.StorageName, virtualStorage)
assert.Equal(t, targetRepo.RelativePath, relativePath)
+ assert.Equal(t, targetRepo.RelativePath, replicaPath)
assert.Equal(t, rewrittenStorage, primary)
assert.Equal(t, []string{healthySecondaryNode.Storage}, updatedSecondaries)
assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries)
@@ -2110,7 +2142,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
requireSuppressedCancellation(t, ctx)
return err
},
- CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _ string, _, _ []string, _, _ bool) error {
+ CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _, _ string, _, _ []string, _, _ bool) error {
requireSuppressedCancellation(t, ctx)
return err
},
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
index 3062ab125..faa5691da 100644
--- a/internal/praefect/datastore/assignment_test.go
+++ b/internal/praefect/datastore/assignment_test.go
@@ -88,7 +88,7 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
repositoryID, err = rs.ReserveRepositoryID(ctx, assignment.virtualStorage, assignment.relativePath)
require.NoError(t, err)
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.relativePath, assignment.storage, nil, nil, false, false))
}
_, err = db.ExecContext(ctx, `
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index 557973e50..3fd1cfe41 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -169,7 +169,7 @@ func TestRepositoryStoreCollector(t *testing.T) {
repositoryID, err = rs.ReserveRepositoryID(ctx, virtualStorage, repository.relativePath)
require.NoError(t, err)
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, repository.relativePath, storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, repository.relativePath, repository.relativePath, storage, nil, nil, false, false))
}
if replica.assigned {
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index ddfb6db8e..c7993b8f6 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -105,7 +105,7 @@ type RepositoryStore interface {
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
- CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
+ CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
// DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages
@@ -344,7 +344,7 @@ AND storage = ANY($2)
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
-func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
+func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
const q = `
WITH repo AS (
INSERT INTO repositories (
@@ -354,7 +354,7 @@ WITH repo AS (
replica_path,
generation,
"primary"
- ) VALUES ($8, $1, $2, $2, 0, CASE WHEN $4 THEN $3 END)
+ ) VALUES ($8, $1, $2, $9, 0, CASE WHEN $4 THEN $3 END)
),
assignments AS (
@@ -399,6 +399,7 @@ FROM (
pq.StringArray(outdatedSecondaries),
storeAssignments,
repositoryID,
+ replicaPath,
)
var pqerr *pq.Error
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 426babd66..23993b60a 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -9,7 +9,7 @@ type MockRepositoryStore struct {
IncrementGenerationFunc func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
GetReplicatedGenerationFunc func(ctx context.Context, repositoryID int64, source, target string) (int, error)
SetGenerationFunc func(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
- CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
+ CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error
@@ -57,12 +57,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, repositoryID int
}
// CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error.
-func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
+func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
if m.CreateRepositoryFunc == nil {
return nil
}
- return m.CreateRepositoryFunc(ctx, repositoryID, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
+ return m.CreateRepositoryFunc(ctx, repositoryID, virtualStorage, relativePath, replicaPath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
}
// SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error.
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index b5eeda410..2a35ef376 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -206,7 +206,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
db.TruncateAll(t)
- require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false))
+ require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "replica-path", "primary", []string{"secondary"}, nil, false, false))
firstTx := db.Begin(t)
secondTx := db.Begin(t)
@@ -224,7 +224,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
secondTx.Commit(t)
requireState(t, ctx, db,
- virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1, replicaPath: "relative-path"}}},
+ virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1, replicaPath: "replica-path"}}},
tc.state,
)
})
@@ -286,7 +286,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("write to outdated nodes", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false))
require.NoError(t, rs.SetGeneration(ctx, 1, "latest-node", repo, 1))
require.Equal(t,
@@ -296,7 +296,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -321,7 +321,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
{vs, "other-relative-path"},
{"other-virtual-storage", repo},
} {
- require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, fmt.Sprintf("replica-path-%d", id+1), "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false))
}
require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"up-to-date-secondary"}))
@@ -329,11 +329,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
- "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path-1"},
+ "other-relative-path": {repositoryID: 2, replicaPath: "replica-path-2"},
},
"other-virtual-storage": {
- "repository-1": {repositoryID: 3, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -365,11 +365,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
- "other-relative-path": {repositoryID: 2, replicaPath: "other-relative-path"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path-1"},
+ "other-relative-path": {repositoryID: 2, replicaPath: "replica-path-2"},
},
"other-virtual-storage": {
- "repository-1": {repositoryID: 3, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -401,11 +401,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("creates a record for the replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
require.NoError(t, rs.SetGeneration(ctx, 1, "storage-2", repo, 0))
requireState(t, ctx,
virtualStorageState{"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
}},
storageState{
"virtual-storage-1": {
@@ -421,11 +421,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("updates existing record with the replicated to relative path", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "storage-1", []string{"storage-2"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "replica-path", "storage-1", []string{"storage-2"}, nil, true, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "original-path": {repositoryID: 1, primary: "storage-1", replicaPath: "original-path"},
+ "original-path": {repositoryID: 1, primary: "storage-1", replicaPath: "replica-path"},
},
},
storageState{
@@ -478,13 +478,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("updates existing record", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "storage-1", nil, nil, false, false))
require.NoError(t, rs.SetGeneration(ctx, 1, stor, repo, 1))
require.NoError(t, rs.SetGeneration(ctx, 1, stor, repo, 0))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -511,11 +511,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("sets an existing replica as the latest", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "storage-1", []string{"storage-2"}, nil, false, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -532,7 +532,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -549,11 +549,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("sets a new replica as the latest", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "storage-1", nil, nil, false, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -569,7 +569,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -591,7 +591,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.Equal(t, GenerationUnknown, generation)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
generation, err = rs.GetGeneration(ctx, 1, stor)
require.NoError(t, err)
@@ -606,7 +606,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.Equal(t, GenerationUnknown, gen)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "source", nil, nil, false, false))
gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target")
require.NoError(t, err)
require.Equal(t, 0, gen)
@@ -615,7 +615,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("upgrade allowed", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "source", nil, nil, false, false))
require.NoError(t, rs.IncrementGeneration(ctx, 1, "source", nil))
gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target")
@@ -631,7 +631,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("downgrade prevented", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "target", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "target", nil, nil, false, false))
require.NoError(t, rs.IncrementGeneration(ctx, 1, "target", nil))
_, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target")
@@ -712,7 +712,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run(tc.desc, func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments))
expectedStorageState := storageState{
vs: {
@@ -731,7 +731,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
vs: {
repo: {
repositoryID: 1,
- replicaPath: repo,
+ replicaPath: "replica-path",
primary: tc.expectedPrimary,
assignments: tc.expectedAssignments,
},
@@ -746,20 +746,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("conflict due to virtual storage and relative path", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
require.Equal(t,
RepositoryExistsError{vs, repo, stor},
- rs.CreateRepository(ctx, 2, vs, repo, stor, nil, nil, false, false),
+ rs.CreateRepository(ctx, 2, vs, repo, "replica-path", stor, nil, nil, false, false),
)
})
t.Run("conflict due to repository id", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path", "storage-1", nil, nil, false, false))
require.Equal(t,
fmt.Errorf("repository id 1 already in use"),
- rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false),
+ rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "replica-path", "storage-2", nil, nil, false, false),
)
})
})
@@ -777,18 +777,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("delete existing", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "storage-1", nil, nil, false, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "storage-1", []string{"storage-2"}, nil, false, false))
- require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "replica-path-1", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-2", "repository-1", "replica-path-2", "storage-1", []string{"storage-2"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "repository-2", "replica-path-3", "storage-1", nil, nil, false, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path-1"},
},
"virtual-storage-2": {
- "repository-1": {repositoryID: 2, replicaPath: "repository-1"},
- "repository-2": {repositoryID: 3, replicaPath: "repository-2"},
+ "repository-1": {repositoryID: 2, replicaPath: "replica-path-2"},
+ "repository-2": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -811,16 +811,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
replicaPath, storages, err := rs.DeleteRepository(ctx, "virtual-storage-2", "repository-1")
require.NoError(t, err)
- require.Equal(t, "repository-1", replicaPath)
+ require.Equal(t, "replica-path-2", replicaPath)
require.Equal(t, []string{"storage-1", "storage-2"}, storages)
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path-1"},
},
"virtual-storage-2": {
- "repository-2": {repositoryID: 3, replicaPath: "repository-2"},
+ "repository-2": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -847,18 +847,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
t.Run("delete existing", func(t *testing.T) {
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false))
- require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path-1", "storage-1", []string{"storage-2"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "replica-path-2", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "replica-path-3", "storage-1", nil, nil, false, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"},
- "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"},
+ "relative-path-1": {repositoryID: 1, replicaPath: "replica-path-1"},
+ "relative-path-2": {repositoryID: 2, replicaPath: "replica-path-2"},
},
"virtual-storage-2": {
- "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"},
+ "relative-path-1": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -884,11 +884,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "relative-path-1": {repositoryID: 1, replicaPath: "relative-path-1"},
- "relative-path-2": {repositoryID: 2, replicaPath: "relative-path-2"},
+ "relative-path-1": {repositoryID: 1, replicaPath: "replica-path-1"},
+ "relative-path-2": {repositoryID: 2, replicaPath: "replica-path-2"},
},
"virtual-storage-2": {
- "relative-path-1": {repositoryID: 3, replicaPath: "relative-path-1"},
+ "relative-path-1": {repositoryID: 3, replicaPath: "replica-path-3"},
},
},
storageState{
@@ -923,14 +923,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("rename existing", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "storage-1", nil, nil, false, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "replica-path-1", "storage-1", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "replica-path-2", "storage-1", []string{"storage-2"}, nil, false, false))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "renamed-all": {repositoryID: 1, replicaPath: "renamed-all"},
- "renamed-some": {repositoryID: 2, replicaPath: "renamed-some"},
+ "renamed-all": {repositoryID: 1, replicaPath: "replica-path-1"},
+ "renamed-some": {repositoryID: 2, replicaPath: "replica-path-2"},
},
},
storageState{
@@ -990,13 +990,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Empty(t, secondaries)
})
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "primary", []string{"consistent-secondary"}, nil, false, false))
require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"consistent-secondary"}))
require.NoError(t, rs.SetGeneration(ctx, 1, "inconsistent-secondary", repo, 0))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -1014,12 +1014,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
})
require.NoError(t, rs.SetGeneration(ctx, 1, "primary", repo, 0))
@@ -1028,12 +1028,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
})
t.Run("storage with highest generation is not configured", func(t *testing.T) {
@@ -1042,7 +1042,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -1060,12 +1060,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
- require.Equal(t, repo, replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
})
t.Run("returns not found for deleted repositories", func(t *testing.T) {
@@ -1087,14 +1087,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("replicas pending rename are considered outdated", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "storage-1", []string{"storage-2"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-path", "replica-path", "storage-1", []string{"storage-2"}, nil, true, false))
replicaPath, storages, err := rs.GetConsistentStorages(ctx, vs, "original-path")
require.NoError(t, err)
- require.Equal(t, "original-path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
- require.Equal(t, "original-path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-1", "new-path"))
@@ -1122,19 +1122,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("DeleteInvalidRepository", func(t *testing.T) {
t.Run("only replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "invalid-storage", nil, nil, false, false))
require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage"))
requireState(t, ctx, virtualStorageState{}, storageState{})
})
t.Run("another replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", "invalid-storage", []string{"other-storage"}, nil, false, false))
require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage"))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
- "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ "repository-1": {repositoryID: 1, replicaPath: "replica-path"},
},
},
storageState{
@@ -1155,7 +1155,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.False(t, exists)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
exists, err = rs.RepositoryExists(ctx, vs, repo)
require.NoError(t, err)
require.True(t, exists)
@@ -1178,7 +1178,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.Equal(t, int64(2), id)
- require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, "replica-path", stor, nil, nil, false, false))
id, err = rs.ReserveRepositoryID(ctx, vs, repo)
require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err)
@@ -1196,7 +1196,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err)
require.Equal(t, int64(0), id)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
id, err = rs.GetRepositoryID(ctx, vs, repo)
require.Nil(t, err)
@@ -1210,11 +1210,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Equal(t, err, commonerr.ErrRepositoryNotFound)
require.Empty(t, replicaPath)
- require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "replica-path", stor, nil, nil, false, false))
replicaPath, err = rs.GetReplicaPath(ctx, 1)
require.NoError(t, err)
- require.Equal(t, replicaPath, repo)
+ require.Equal(t, "replica-path", replicaPath)
})
}
diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go
index 6212d15f2..419115119 100644
--- a/internal/praefect/datastore/storage_cleanup_test.go
+++ b/internal/praefect/datastore/storage_cleanup_test.go
@@ -182,8 +182,8 @@ func TestStorageCleanup_Exists(t *testing.T) {
db := glsql.NewDB(t)
repoStore := NewPostgresRepositoryStore(db.DB, nil)
- require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false))
- require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "replica-path-1", "g1", []string{"g2", "g3"}, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "replica-path-2", "g1", []string{"g2", "g3"}, nil, false, false))
storageCleanup := NewStorageCleanup(db.DB)
for _, tc := range []struct {
@@ -197,21 +197,21 @@ func TestStorageCleanup_Exists(t *testing.T) {
desc: "multiple doesn't exist",
virtualStorage: "vs",
storage: "g1",
- relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"},
+ relativeReplicaPaths: []string{"replica-path-1", "replica-path-2", "path/x", "path/y"},
out: []string{"path/x", "path/y"},
},
{
desc: "duplicates",
virtualStorage: "vs",
storage: "g1",
- relativeReplicaPaths: []string{"p/1", "path/x", "path/x"},
+ relativeReplicaPaths: []string{"replica-path-1", "path/x", "path/x"},
out: []string{"path/x"},
},
{
desc: "all exist",
virtualStorage: "vs",
storage: "g1",
- relativeReplicaPaths: []string{"p/1", "p/2"},
+ relativeReplicaPaths: []string{"replica-path-1", "replica-path-2"},
out: nil,
},
{
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index fc214df1e..bc78c6a6f 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -28,7 +28,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -38,7 +38,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
- require.Equal(t, "/repo/path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -54,7 +54,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -64,7 +64,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
- require.Equal(t, "/repo/path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -78,7 +78,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
- require.Equal(t, "/repo/path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -121,7 +121,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context(testhelper.ContextWithLogger(logger))
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -131,7 +131,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
- require.Equal(t, "/repo/path/1", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
// invalid payload disables caching
notification := glsql.Notification{Channel: "notification_channel_1", Payload: `_`}
@@ -142,19 +142,19 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
- require.Equal(t, "/repo/path/1", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
// third access retrieves data and caches it
replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3)
- require.Equal(t, "/repo/path/1", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
// fourth access retrieves data from cache
replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4)
- require.Equal(t, "/repo/path/1", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
require.Len(t, logHook.AllEntries(), 1)
assert.Equal(t, "received payload can't be processed, cache disabled", logHook.LastEntry().Message)
@@ -181,8 +181,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", []string{"g2", "g3"}, nil, true, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", []string{"g2"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "replica-path-1", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "replica-path-2", "g1", []string{"g2"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -192,11 +192,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1)
- require.Equal(t, "/repo/path/1", replicaPath)
+ require.Equal(t, "replica-path-1", replicaPath)
replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1)
- require.Equal(t, "/repo/path/2", replicaPath)
+ require.Equal(t, "replica-path-2", replicaPath)
// notification evicts entries for '/repo/path/2' from the cache
cache.Notification(glsql.Notification{Payload: `
@@ -210,12 +210,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2)
- require.Equal(t, "/repo/path/1", replicaPath1)
+ require.Equal(t, "replica-path-1", replicaPath1)
// second access populates the cache again for '/repo/path/2'
replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2)
- require.Equal(t, "/repo/path/2", replicaPath)
+ require.Equal(t, "replica-path-2", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -234,7 +234,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "g1", []string{"g2", "g3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
@@ -244,7 +244,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
- require.Equal(t, "/repo/path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
// disconnection disables cache
cache.Disconnect(assert.AnError)
@@ -253,7 +253,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
- require.Equal(t, "/repo/path", replicaPath)
+ require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
# HELP gitaly_praefect_uptodate_storages_cache_access_total Total number of cache access operations during defining of up to date storages for reads distribution (per virtual storage)
@@ -271,8 +271,8 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "g1", nil, nil, true, false))
- require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "g1", nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "replica-path-1", "g1", nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "replica-path-2", "g1", nil, nil, true, false))
cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), rs, []string{"vs"})
require.NoError(t, err)
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index 96f86ab64..64129875b 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -80,7 +80,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
conns := nodeSet.Connections()
rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
require.NoError(t,
- rs.CreateRepository(ctx, 1, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false),
+ rs.CreateRepository(ctx, 1, virtualStorage, testRepo.GetRelativePath(), testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false),
)
cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index edeebfa53..8d872a5dd 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -489,7 +489,7 @@ func TestPerRepositoryElector(t *testing.T) {
for storage, record := range storages {
if !repoCreated {
repoCreated = true
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, storage, nil, nil, false, false))
}
require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, relativePath, record.generation))
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index 6932f5aff..56a5413e0 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -1023,7 +1023,7 @@ func TestReconciler(t *testing.T) {
repositoryID, err = rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
require.NoError(t, err)
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, storage, nil, nil, false, false))
}
require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, relativePath, repo.generation))
@@ -1214,7 +1214,7 @@ func TestReconciler_renames(t *testing.T) {
)
rs := datastore.NewPostgresRepositoryStore(db, configuredStorages)
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "original-path", "storage-1", []string{"storage-2"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "original-path", "replica-path", "storage-1", []string{"storage-2"}, nil, true, false))
require.NoError(t, rs.SetGeneration(ctx, 1, tc.latestStorage, "original-path", 1))
require.NoError(t, rs.RenameRepository(ctx, "virtual-storage", "original-path", "storage-1", "new-path"))
diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go
index 7ca6abf00..59cb92bcb 100644
--- a/internal/praefect/remove_repository_test.go
+++ b/internal/praefect/remove_repository_test.go
@@ -89,7 +89,7 @@ func TestRemoveRepositoryHandler(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 0, virtualStorage, relativePath, gitaly1Storage, []string{gitaly2Storage, "non-existent-storage"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 0, virtualStorage, relativePath, relativePath, gitaly1Storage, []string{gitaly2Storage, "non-existent-storage"}, nil, false, false))
tmp := testhelper.TempDir(t)
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index e99ceff9a..ec0873e09 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -51,7 +51,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "relative-path-1", "gitaly-1", nil, nil, true, false))
exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1")
require.NoError(t, err)
@@ -92,7 +92,7 @@ func TestReplicatorDestroy(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false))
ln, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 8d6857656..0afa96c44 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -162,7 +162,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
db := glsql.NewDB(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, conf.VirtualStorages[0].Name, testRepo.GetRelativePath(), testRepo.GetRelativePath(), shard.Primary.GetStorage(), nil, nil, true, false))
replMgr := NewReplMgr(
loggerEntry,
@@ -719,7 +719,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
db := glsql.NewDB(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, okJob.RepositoryID, okJob.VirtualStorage, okJob.RelativePath, okJob.RelativePath, okJob.SourceNodeStorage, nil, nil, true, false))
replMgr := NewReplMgr(
logEntry,
@@ -875,7 +875,7 @@ func TestProcessBacklog_Success(t *testing.T) {
db := glsql.NewDB(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, eventType1.Job.RepositoryID, eventType1.Job.VirtualStorage, eventType1.Job.VirtualStorage, eventType1.Job.RelativePath, eventType1.Job.SourceNodeStorage, nil, nil, true, false))
replMgr := NewReplMgr(
logEntry,
@@ -1018,7 +1018,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
db := glsql.NewDB(t)
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, "ignored", "ignored", primaryStorage, []string{secondaryStorage}, nil, true, false))
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
index f74f1e6b3..4d4a6db1b 100644
--- a/internal/praefect/repocleaner/repository_test.go
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -110,7 +110,7 @@ func TestRunner_Run(t *testing.T) {
secondaries: []string{storage2, storage3},
},
} {
- require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.relativePath, set.primary, set.secondaries, nil, false, false))
}
logger, loggerHook := test.NewNullLogger()
@@ -233,7 +233,7 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) {
primary: storage1,
},
} {
- require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, nil, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.relativePath, set.primary, nil, nil, false, false))
}
logger := testhelper.NewTestLogger(t)
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 39a25a71d..35839fbd4 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -74,7 +74,7 @@ func TestRepositoryExistsHandler(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false))
tmp := testhelper.TempDir(t)
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index f4af466a4..8c3ec9a5c 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -37,6 +37,9 @@ type RepositoryMutatorRoute struct {
RepositoryID int64
// ReplicaPath is the disk path where the replicas are stored.
ReplicaPath string
+ // AdditionalReplicaPath is the disk path where the possible additional repository in the request
+ // is stored. This is only used for object pools.
+ AdditionalReplicaPath string
// Primary is the primary node of the transaction.
Primary RouterNode
// Secondaries are the secondary participating in a transaction.
@@ -59,7 +62,7 @@ type Router interface {
// RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
// Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used
// with existing repositories.
- RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
+ RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 10a14c43f..7793c4d6d 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -74,7 +74,7 @@ func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStor
}, nil
}
-func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
shard, err := r.mgr.GetShard(ctx, virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
@@ -115,10 +115,11 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
}
return RepositoryMutatorRoute{
- ReplicaPath: relativePath,
- Primary: toRouterNode(shard.Primary),
- Secondaries: toRouterNodes(participatingSecondaries),
- ReplicationTargets: replicationTargets,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: additionalRelativePath,
+ Primary: toRouterNode(shard.Primary),
+ Secondaries: toRouterNodes(participatingSecondaries),
+ ReplicationTargets: replicationTargets,
}, nil
}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 84c0097a2..a41c0938e 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -189,7 +189,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu
}, nil
}
-func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error) {
healthyNodes, err := r.healthyNodes(virtualStorage)
if err != nil {
return RepositoryMutatorRoute{}, err
@@ -200,6 +200,19 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err)
}
+ var additionalReplicaPath string
+ if additionalRelativePath != "" {
+ additionalRepositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, additionalRelativePath)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository id: %w", err)
+ }
+
+ additionalReplicaPath, err = r.rs.GetReplicaPath(ctx, additionalRepositoryID)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("get additional repository replica path: %w", err)
+ }
+ }
+
primary, err := r.pg.GetPrimary(ctx, virtualStorage, repositoryID)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get primary: %w", err)
@@ -228,7 +241,11 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err)
}
- route := RepositoryMutatorRoute{RepositoryID: repositoryID, ReplicaPath: replicaPath}
+ route := RepositoryMutatorRoute{
+ RepositoryID: repositoryID,
+ ReplicaPath: replicaPath,
+ AdditionalReplicaPath: additionalReplicaPath,
+ }
for _, assigned := range assignedStorages {
node, healthy := healthySet[assigned]
if assigned == primary {
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index 8863e579a..8372422d4 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -224,7 +224,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath)
require.NoError(t, err)
require.NoError(t,
- rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary",
+ rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, relativePath, "primary",
[]string{"consistent-secondary", "unhealthy-secondary", "inconsistent-secondary"}, nil, true, true),
)
require.NoError(t,
@@ -276,14 +276,15 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
}
for _, tc := range []struct {
- desc string
- virtualStorage string
- healthyNodes StaticHealthChecker
- consistentStorages []string
- secondaries []string
- replicationTargets []string
- error error
- assignedNodes StaticRepositoryAssignments
+ desc string
+ virtualStorage string
+ healthyNodes StaticHealthChecker
+ consistentStorages []string
+ secondaries []string
+ replicationTargets []string
+ error error
+ assignedNodes StaticRepositoryAssignments
+ noAdditionalRepository bool
}{
{
desc: "unknown virtual storage",
@@ -312,6 +313,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
secondaries: []string{"secondary-1", "secondary-2"},
},
{
+ desc: "no additional repository",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: StaticHealthChecker(configuredNodes),
+ consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
+ secondaries: []string{"secondary-1", "secondary-2"},
+ noAdditionalRepository: true,
+ },
+ {
desc: "inconsistent secondary",
virtualStorage: "virtual-storage-1",
healthyNodes: StaticHealthChecker(configuredNodes),
@@ -368,14 +377,25 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredNodes})
- const relativePath = "repository"
+ const (
+ virtualStorage = "virtual-storage-1"
+ relativePath = "repository"
+ additionalRelativePath = "additional-repository"
+ additionalReplicaPath = "additional-replica-path"
+ )
rs := datastore.NewPostgresRepositoryStore(tx, nil)
- repositoryID, err := rs.ReserveRepositoryID(ctx, "virtual-storage-1", relativePath)
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
require.NoError(t, err)
require.NoError(t,
- rs.CreateRepository(ctx, repositoryID, "virtual-storage-1", relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
+ rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, relativePath, "primary", []string{"secondary-1", "secondary-2"}, nil, true, false),
+ )
+
+ additionalRepositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, additionalRelativePath)
+ require.NoError(t, err)
+ require.NoError(t,
+ rs.CreateRepository(ctx, additionalRepositoryID, virtualStorage, additionalRelativePath, additionalReplicaPath, "primary", nil, nil, true, false),
)
if len(tc.consistentStorages) > 0 {
@@ -405,7 +425,14 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
nil,
)
- route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath)
+ requestAdditionalRelativePath := additionalRelativePath
+ expectedAdditionalReplicaPath := additionalReplicaPath
+ if tc.noAdditionalRepository {
+ expectedAdditionalReplicaPath = ""
+ requestAdditionalRelativePath = ""
+ }
+
+ route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, relativePath, requestAdditionalRelativePath)
require.Equal(t, tc.error, err)
if err == nil {
var secondaries []RouterNode
@@ -417,8 +444,9 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
}
require.Equal(t, RepositoryMutatorRoute{
- RepositoryID: repositoryID,
- ReplicaPath: relativePath,
+ RepositoryID: repositoryID,
+ ReplicaPath: relativePath,
+ AdditionalReplicaPath: expectedAdditionalReplicaPath,
Primary: RouterNode{
Storage: "primary",
Connection: conns[tc.virtualStorage]["primary"],
@@ -607,7 +635,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, nil)
if tc.repositoryExists {
require.NoError(t,
- rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, "primary", nil, nil, true, true),
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, relativePath, "primary", nil, nil, true, true),
)
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c9a9fb25d..8e2ffd9ee 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -640,7 +640,7 @@ func TestRenameRepository(t *testing.T) {
defer tx.Rollback(t)
rs := datastore.NewPostgresRepositoryStore(tx, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)