Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-11 22:24:54 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-11 22:24:54 +0300
commitc80c5bec34b3f50ac37fd4d49ad12eeee23f0fd9 (patch)
treef7f699009a748f4ac73c156ed8cc20a79562aadc
parent69eac9a0db256b27a8872f6e871e5d7e61485471 (diff)
parentc0388d7869bcde00f6d6fefb0ebc7ea4f05e68c9 (diff)
Merge branch 'smh-cluster-repository-id-14-4' into 'master'
Join RepositoryStore queries using repository ID See merge request gitlab-org/gitaly!3907
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go4
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go26
-rw-r--r--internal/praefect/commonerr/error.go6
-rw-r--r--internal/praefect/coordinator.go2
-rw-r--r--internal/praefect/coordinator_pg_test.go6
-rw-r--r--internal/praefect/coordinator_test.go2
-rw-r--r--internal/praefect/datastore/repository_store.go104
-rw-r--r--internal/praefect/datastore/repository_store_mock.go30
-rw-r--r--internal/praefect/datastore/repository_store_test.go86
-rw-r--r--internal/praefect/nodes/per_repository_test.go17
-rw-r--r--internal/praefect/reconciler/reconciler_test.go8
-rw-r--r--internal/praefect/replicator.go11
-rw-r--r--internal/praefect/replicator_pg_test.go10
-rw-r--r--internal/praefect/replicator_test.go4
-rw-r--r--internal/praefect/router_per_repository_test.go4
15 files changed, 163 insertions, 157 deletions
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index 2945b43f1..55d235afc 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -50,7 +50,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false))
}
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, generation))
+ require.NoError(t, rs.SetGeneration(ctx, 1, storage, generation))
}
ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, rs, nil, nil, nil))})
@@ -145,7 +145,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
require.NoError(t, fs.Parse(tc.args))
tc.matchError(t, cmd.Exec(fs, conf))
for storage, expected := range tc.expectedGenerations {
- actual, err := rs.GetGeneration(ctx, vs, repo, storage)
+ actual, err := rs.GetGeneration(ctx, 1, storage)
require.NoError(t, err)
require.Equal(t, expected, actual, storage)
}
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index c9d0e38fa..07d7ad602 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -54,30 +54,30 @@ func TestDatalossSubcommand(t *testing.T) {
for _, q := range []string{
`
- INSERT INTO repositories (virtual_storage, relative_path, "primary")
+ INSERT INTO repositories (repository_id, virtual_storage, relative_path, "primary")
VALUES
- ('virtual-storage-1', 'repository-1', 'gitaly-1'),
- ('virtual-storage-1', 'repository-2', 'gitaly-3')
+ (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'),
+ (2, 'virtual-storage-1', 'repository-2', 'gitaly-3')
`,
`
- INSERT INTO repository_assignments (virtual_storage, relative_path, storage)
+ INSERT INTO repository_assignments (repository_id, virtual_storage, relative_path, storage)
VALUES
- ('virtual-storage-1', 'repository-1', 'gitaly-1'),
- ('virtual-storage-1', 'repository-1', 'gitaly-2'),
- ('virtual-storage-1', 'repository-2', 'gitaly-1'),
- ('virtual-storage-1', 'repository-2', 'gitaly-3')
+ (1, 'virtual-storage-1', 'repository-1', 'gitaly-1'),
+ (1, 'virtual-storage-1', 'repository-1', 'gitaly-2'),
+ (2, 'virtual-storage-1', 'repository-2', 'gitaly-1'),
+ (2, 'virtual-storage-1', 'repository-2', 'gitaly-3')
`,
} {
_, err := tx.ExecContext(ctx, q)
require.NoError(t, err)
}
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-1", 1))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-2", 0))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "gitaly-3", 0))
+ require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-1", 1))
+ require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-2", 0))
+ require.NoError(t, gs.SetGeneration(ctx, 1, "gitaly-3", 0))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-2", 1))
- require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
+ require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-2", 1))
+ require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-3", 0))
ln, clean := listenAndServe(t, []svcRegistrar{
registerPraefectInfoServer(info.NewServer(cfg, gs, nil, nil, nil)),
diff --git a/internal/praefect/commonerr/error.go b/internal/praefect/commonerr/error.go
index 4ad2f6c80..d113f4485 100644
--- a/internal/praefect/commonerr/error.go
+++ b/internal/praefect/commonerr/error.go
@@ -25,5 +25,11 @@ func (err RepositoryNotFoundError) Error() string {
return fmt.Sprintf("repository %q/%q not found", err.virtualStorage, err.relativePath)
}
+// ErrRepositoryNotFound is returned when operating on a repository that doesn't exist.
+//
+// This somewhat duplicates the above RepositoryNotFoundError but doesn't specify which repository was not found.
+// With repository IDs in use, the virtual storage and relative path won't be available everywhere anymore.
+var ErrRepositoryNotFound = errors.New("repository not found")
+
// ErrRepositoryAlreadyExists is returned when attempting to create a repository that already exists.
var ErrRepositoryAlreadyExists = errors.New("repository already exists")
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 181e3a19d..6f1e8bc42 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -1016,7 +1016,7 @@ func (c *Coordinator) newRequestFinalizer(
// If this fails, the primary might have changes on it that are not recorded in the database. The secondaries will appear
// consistent with the primary but might serve different stale data. Follow-up mutator calls will solve this state although
// the primary will be a later generation in the mean while.
- if err := c.rs.IncrementGeneration(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, updatedSecondaries); err != nil {
+ if err := c.rs.IncrementGeneration(ctx, repositoryID, primary, updatedSecondaries); err != nil {
return fmt.Errorf("increment generation: %w", err)
}
case datastore.RenameRepo:
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 14c281229..20c447ee7 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -210,7 +210,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, true, false))
}
- require.NoError(t, rs.SetGeneration(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, n.generation))
+ require.NoError(t, rs.SetGeneration(ctx, 1, storageNodes[i].Storage, n.generation))
}
testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
@@ -298,7 +298,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
if tc.concurrentWrite {
- require.NoError(t, rs.SetGeneration(ctx, repo.StorageName, repo.RelativePath, "non-participating-storage", 2))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "non-participating-storage", 2))
}
err = streamParams.RequestFinalizer()
@@ -312,7 +312,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
// Nodes that did not successfully commit or did not participate should remain on their
// existing generation.
for i, n := range tc.nodes {
- gen, err := rs.GetGeneration(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage)
+ gen, err := rs.GetGeneration(ctx, 1, storageNodes[i].Storage)
require.NoError(t, err)
require.Equal(t, n.expectedGeneration, gen, "node %d has wrong generation", i)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 664428fb9..2fb155fde 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -2102,7 +2102,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
},
},
datastore.MockRepositoryStore{
- IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error {
+ IncrementGenerationFunc: func(ctx context.Context, _ int64, _ string, _ []string) error {
requireSuppressedCancellation(t, ctx)
return err
},
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 9c8dcf30e..d24c031e9 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -25,16 +25,14 @@ var errWriteToOutdatedNodes = errors.New("write to outdated nodes")
// DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository
// that does not upgrade the target repository.
type DowngradeAttemptedError struct {
- VirtualStorage string
- RelativePath string
Storage string
CurrentGeneration int
AttemptedGeneration int
}
func (err DowngradeAttemptedError) Error() string {
- return fmt.Sprintf("attempted downgrading %q -> %q -> %q from generation %d to %d",
- err.VirtualStorage, err.RelativePath, err.Storage, err.CurrentGeneration, err.AttemptedGeneration,
+ return fmt.Sprintf("attempted downgrading storage %q from generation %d to %d",
+ err.Storage, err.CurrentGeneration, err.AttemptedGeneration,
)
}
@@ -85,15 +83,15 @@ var ErrNoRowsAffected = errors.New("no rows were affected by the query")
// RepositoryStore provides access to repository state.
type RepositoryStore interface {
// GetGeneration gets the repository's generation on a given storage.
- GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
+ GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
// IncrementGeneration increments the generations of up to date nodes.
- IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
+ IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
// SetGeneration sets the repository's generation on the given storage. If the generation is higher
// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
- SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ SetGeneration(ctx context.Context, repositoryID int64, storage string, generation int) error
// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
// downgrade, a DowngradeAttemptedError is returned.
- GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
+ GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
@@ -126,7 +124,7 @@ type RepositoryStore interface {
// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
// record of the invalid repository. If the storage was the only storage with the repository, the repository's
// record on the virtual storage is also deleted.
- DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
+ DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
// exists with the given virtual storage and relative path combination, an error is returned.
ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
@@ -147,17 +145,16 @@ func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string]
return &PostgresRepositoryStore{db: db, storages: storages(configuredStorages)}
}
-func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
+func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) {
const q = `
SELECT generation
FROM storage_repositories
-WHERE virtual_storage = $1
-AND relative_path = $2
-AND storage = $3
+WHERE repository_id = $1
+AND storage = $2
`
var gen int
- if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&gen); err != nil {
+ if err := rs.db.QueryRowContext(ctx, q, repositoryID, storage).Scan(&gen); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return GenerationUnknown, nil
}
@@ -168,54 +165,50 @@ AND storage = $3
return gen, nil
}
-func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error {
+func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error {
const q = `
WITH updated_replicas AS (
UPDATE storage_repositories
SET generation = generation + 1
FROM (
- SELECT virtual_storage, relative_path, storage
+ SELECT repository_id, storage
FROM repositories
- JOIN storage_repositories USING (virtual_storage, relative_path, generation)
- WHERE virtual_storage = $1
- AND relative_path = $2
- AND storage = ANY($3)
+ JOIN storage_repositories USING (repository_id, generation)
+ WHERE repository_id = $1
+ AND storage = ANY($2)
FOR UPDATE
) AS to_update
- WHERE storage_repositories.virtual_storage = to_update.virtual_storage
- AND storage_repositories.relative_path = to_update.relative_path
- AND storage_repositories.storage = to_update.storage
- RETURNING storage_repositories.virtual_storage, storage_repositories.relative_path
+ WHERE storage_repositories.repository_id = to_update.repository_id
+ AND storage_repositories.storage = to_update.storage
+ RETURNING storage_repositories.repository_id
),
updated_repository AS (
UPDATE repositories
SET generation = generation + 1
FROM (
- SELECT DISTINCT virtual_storage, relative_path
+ SELECT DISTINCT repository_id
FROM updated_replicas
) AS updated_repositories
- WHERE repositories.virtual_storage = updated_repositories.virtual_storage
- AND repositories.relative_path = updated_repositories.relative_path
+ WHERE repositories.repository_id = updated_repositories.repository_id
)
SELECT
EXISTS (
SELECT FROM repositories
- WHERE virtual_storage = $1
- AND relative_path = $2
+ WHERE repository_id = $1
) AS repository_exists,
EXISTS ( SELECT FROM updated_replicas ) AS repository_updated
`
var repositoryExists, repositoryUpdated bool
if err := rs.db.QueryRowContext(
- ctx, q, virtualStorage, relativePath, pq.StringArray(append(secondaries, primary)),
+ ctx, q, repositoryID, pq.StringArray(append(secondaries, primary)),
).Scan(&repositoryExists, &repositoryUpdated); err != nil {
return fmt.Errorf("scan: %w", err)
}
if !repositoryExists {
- return commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
+ return commonerr.ErrRepositoryNotFound
}
if !repositoryUpdated {
@@ -225,13 +218,12 @@ SELECT
return nil
}
-func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage string, generation int) error {
const q = `
WITH repository AS (
- UPDATE repositories SET generation = $4
- WHERE virtual_storage = $1
- AND relative_path = $2
- AND COALESCE(repositories.generation, -1) < $4
+ UPDATE repositories SET generation = $3
+ WHERE repository_id = $1
+ AND COALESCE(repositories.generation, -1) < $3
)
INSERT INTO storage_repositories (
@@ -242,14 +234,18 @@ INSERT INTO storage_repositories (
generation
)
SELECT
- (SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2),
- $1, $2, $3, $4
+ repository_id,
+ virtual_storage,
+ relative_path,
+ $2,
+ $3
+FROM repositories
+WHERE repository_id = $1
ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET
- repository_id = EXCLUDED.repository_id,
generation = EXCLUDED.generation
`
- _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation)
+ _, err := rs.db.ExecContext(ctx, q, repositoryID, storage, generation)
return err
}
@@ -284,16 +280,15 @@ ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE
return nil
}
-func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) {
+func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) {
const q = `
SELECT storage, generation
FROM storage_repositories
-WHERE virtual_storage = $1
-AND relative_path = $2
-AND storage = ANY($3)
+WHERE repository_id = $1
+AND storage = ANY($2)
`
- rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath, pq.StringArray([]string{source, target}))
+ rows, err := rs.db.QueryContext(ctx, q, repositoryID, pq.StringArray([]string{source, target}))
if err != nil {
return 0, err
}
@@ -324,8 +319,6 @@ AND storage = ANY($3)
if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration {
return 0, DowngradeAttemptedError{
- VirtualStorage: virtualStorage,
- RelativePath: relativePath,
Storage: target,
CurrentGeneration: targetGeneration,
AttemptedGeneration: sourceGeneration,
@@ -557,26 +550,23 @@ AND relative_path = $2
return exists, nil
}
-func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error {
_, err := rs.db.ExecContext(ctx, `
WITH invalid_repository AS (
DELETE FROM storage_repositories
- WHERE virtual_storage = $1
- AND relative_path = $2
- AND storage = $3
+ WHERE repository_id = $1
+ AND storage = $2
)
DELETE FROM repositories
-WHERE virtual_storage = $1
-AND relative_path = $2
+WHERE repository_id = $1
AND NOT EXISTS (
SELECT 1
FROM storage_repositories
- WHERE virtual_storage = $1
- AND relative_path = $2
- AND storage != $3
+ WHERE repository_id = $1
+ AND storage != $2
)
- `, virtualStorage, relativePath, storage)
+ `, repositoryID, storage)
return err
}
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 0e3c46c92..fc3b5e53f 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -5,10 +5,10 @@ import "context"
// MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods
// default to what could be considered success if not set.
type MockRepositoryStore struct {
- GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
- IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
- GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
- SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
+ GetGenerationFunc func(ctx context.Context, repositoryID int64, storage string) (int, error)
+ IncrementGenerationFunc func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
+ GetReplicatedGenerationFunc func(ctx context.Context, repositoryID int64, source, target string) (int, error)
+ SetGenerationFunc func(ctx context.Context, repositoryID int64, storage string, generation int) error
CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error
@@ -16,42 +16,42 @@ type MockRepositoryStore struct {
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error)
- DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error
RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
}
-func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
+func (m MockRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) {
if m.GetGenerationFunc == nil {
return GenerationUnknown, nil
}
- return m.GetGenerationFunc(ctx, virtualStorage, relativePath, storage)
+ return m.GetGenerationFunc(ctx, repositoryID, storage)
}
-func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error {
+func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error {
if m.IncrementGenerationFunc == nil {
return nil
}
- return m.IncrementGenerationFunc(ctx, virtualStorage, relativePath, primary, secondaries)
+ return m.IncrementGenerationFunc(ctx, repositoryID, primary, secondaries)
}
-func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) {
+func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) {
if m.GetReplicatedGenerationFunc == nil {
return GenerationUnknown, nil
}
- return m.GetReplicatedGenerationFunc(ctx, virtualStorage, relativePath, source, target)
+ return m.GetReplicatedGenerationFunc(ctx, repositoryID, source, target)
}
-func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
+func (m MockRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage string, generation int) error {
if m.SetGenerationFunc == nil {
return nil
}
- return m.SetGenerationFunc(ctx, virtualStorage, relativePath, storage, generation)
+ return m.SetGenerationFunc(ctx, repositoryID, storage, generation)
}
// CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error.
@@ -115,12 +115,12 @@ func (m MockRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Conte
return m.GetPartiallyAvailableRepositoriesFunc(ctx, virtualStorage)
}
-func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error {
if m.DeleteInvalidRepositoryFunc == nil {
return nil
}
- return m.DeleteInvalidRepositoryFunc(ctx, virtualStorage, relativePath, storage)
+ return m.DeleteInvalidRepositoryFunc(ctx, repositoryID, storage)
}
func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index f8bd2085d..0a2cd1bf4 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -210,7 +210,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
firstTx := db.Begin(t)
secondTx := db.Begin(t)
- err := NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, "virtual-storage", "relative-path", tc.first.primary, tc.first.secondaries)
+ err := NewPostgresRepositoryStore(firstTx, nil).IncrementGeneration(ctx, 1, tc.first.primary, tc.first.secondaries)
require.NoError(t, err)
go func() {
@@ -218,7 +218,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) {
firstTx.Commit(t)
}()
- err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, "virtual-storage", "relative-path", tc.second.primary, tc.second.secondaries)
+ err = NewPostgresRepositoryStore(secondTx, nil).IncrementGeneration(ctx, 1, tc.second.primary, tc.second.secondaries)
require.Equal(t, tc.error, err)
secondTx.Commit(t)
@@ -273,8 +273,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
rs, requireState := newStore(t, nil)
require.Equal(t,
- rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"secondary-1"}),
- commonerr.NewRepositoryNotFoundError(vs, repo),
+ rs.IncrementGeneration(ctx, 1, "primary", []string{"secondary-1"}),
+ commonerr.ErrRepositoryNotFound,
)
requireState(t, ctx,
virtualStorageState{},
@@ -286,10 +286,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
rs, requireState := newStore(t, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "latest-node", 1))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "latest-node", 1))
require.Equal(t,
- rs.IncrementGeneration(ctx, vs, repo, "outdated-primary", []string{"outdated-secondary"}),
+ rs.IncrementGeneration(ctx, 1, "outdated-primary", []string{"outdated-secondary"}),
errWriteToOutdatedNodes,
)
requireState(t, ctx,
@@ -323,7 +323,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false))
}
- require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"up-to-date-secondary"}))
+ require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"up-to-date-secondary"}))
requireState(t, ctx,
virtualStorageState{
@@ -358,7 +358,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
- require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{
+ require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{
"up-to-date-secondary", "outdated-secondary", "non-existing-secondary",
}))
requireState(t, ctx,
@@ -400,14 +400,17 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("creates a record for the replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- err := rs.SetGeneration(ctx, vs, repo, stor, 1)
- require.NoError(t, err)
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "storage-2", 0))
requireState(t, ctx,
- virtualStorageState{},
+ virtualStorageState{"virtual-storage-1": {
+ "repository-1": {repositoryID: 1, replicaPath: "repository-1"},
+ }},
storageState{
"virtual-storage-1": {
"repository-1": {
- "storage-1": {repositoryID: 0, generation: 1},
+ "storage-1": {repositoryID: 1, generation: 0},
+ "storage-2": {repositoryID: 1, generation: 0},
},
},
},
@@ -418,8 +421,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
rs, requireState := newStore(t, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+ require.NoError(t, rs.SetGeneration(ctx, 1, stor, 1))
+ require.NoError(t, rs.SetGeneration(ctx, 1, stor, 0))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
@@ -526,13 +529,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("GetGeneration", func(t *testing.T) {
rs, _ := newStore(t, nil)
- generation, err := rs.GetGeneration(ctx, vs, repo, stor)
+ generation, err := rs.GetGeneration(ctx, 1, stor)
require.NoError(t, err)
require.Equal(t, GenerationUnknown, generation)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false))
- generation, err = rs.GetGeneration(ctx, vs, repo, stor)
+ generation, err = rs.GetGeneration(ctx, 1, stor)
require.NoError(t, err)
require.Equal(t, 0, generation)
})
@@ -541,12 +544,12 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("no previous record allowed", func(t *testing.T) {
rs, _ := newStore(t, nil)
- gen, err := rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
+ gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target")
require.NoError(t, err)
require.Equal(t, GenerationUnknown, gen)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "source", 0))
- gen, err = rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false))
+ gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target")
require.NoError(t, err)
require.Equal(t, 0, gen)
})
@@ -554,13 +557,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("upgrade allowed", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "source", 1))
- gen, err := rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "source", nil, nil, false, false))
+ require.NoError(t, rs.IncrementGeneration(ctx, 1, "source", nil))
+
+ gen, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target")
require.NoError(t, err)
require.Equal(t, 1, gen)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "target", 0))
- gen, err = rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
+ require.NoError(t, rs.SetGeneration(ctx, 1, "target", 0))
+ gen, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target")
require.NoError(t, err)
require.Equal(t, 1, gen)
})
@@ -568,18 +573,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("downgrade prevented", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "target", 1))
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "target", nil, nil, false, false))
+ require.NoError(t, rs.IncrementGeneration(ctx, 1, "target", nil))
- _, err := rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
- require.Equal(t, DowngradeAttemptedError{vs, repo, "target", 1, GenerationUnknown}, err)
+ _, err := rs.GetReplicatedGeneration(ctx, 1, "source", "target")
+ require.Equal(t, DowngradeAttemptedError{"target", 1, GenerationUnknown}, err)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "source", 1))
- _, err = rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
- require.Equal(t, DowngradeAttemptedError{vs, repo, "target", 1, 1}, err)
+ require.NoError(t, rs.SetGeneration(ctx, 1, "source", 1))
+ _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target")
+ require.Equal(t, DowngradeAttemptedError{"target", 1, 1}, err)
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "source", 0))
- _, err = rs.GetReplicatedGeneration(ctx, vs, repo, "source", "target")
- require.Equal(t, DowngradeAttemptedError{vs, repo, "target", 1, 0}, err)
+ require.NoError(t, rs.SetGeneration(ctx, 1, "source", 0))
+ _, err = rs.GetReplicatedGeneration(ctx, 1, "source", "target")
+ require.Equal(t, DowngradeAttemptedError{"target", 1, 0}, err)
})
})
@@ -957,8 +963,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false))
- require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"consistent-secondary"}))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "inconsistent-secondary", 0))
+ require.NoError(t, rs.IncrementGeneration(ctx, 1, "primary", []string{"consistent-secondary"}))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "inconsistent-secondary", 0))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
@@ -982,7 +988,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
})
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 0))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 0))
t.Run("outdated primary", func(t *testing.T) {
secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
@@ -991,8 +997,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
t.Run("storage with highest generation is not configured", func(t *testing.T) {
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "unknown", 2))
- require.NoError(t, rs.SetGeneration(ctx, vs, repo, "primary", 1))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "unknown", 2))
+ require.NoError(t, rs.SetGeneration(ctx, 1, "primary", 1))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
@@ -1041,14 +1047,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("only replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false))
- require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage"))
+ require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage"))
requireState(t, ctx, virtualStorageState{}, storageState{})
})
t.Run("another replica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false))
- require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage"))
+ require.NoError(t, rs.DeleteInvalidRepository(ctx, 1, "invalid-storage"))
requireState(t, ctx,
virtualStorageState{
"virtual-storage-1": {
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index b37071251..14949af78 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -490,19 +490,22 @@ func TestPerRepositoryElector(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, nil)
for virtualStorage, relativePaths := range tc.state {
for relativePath, storages := range relativePaths {
- _, err := db.ExecContext(ctx,
- `INSERT INTO repositories (virtual_storage, relative_path) VALUES ($1, $2)`,
- virtualStorage, relativePath,
- )
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
require.NoError(t, err)
+ repoCreated := false
for storage, record := range storages {
- require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, record.generation))
+ if !repoCreated {
+ repoCreated = true
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false))
+ }
+
+ require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, record.generation))
if record.assigned {
_, err := db.ExecContext(ctx, `
- INSERT INTO repository_assignments VALUES ($1, $2, $3)
- `, virtualStorage, relativePath, storage)
+ INSERT INTO repository_assignments VALUES ($1, $2, $3, $4)
+ `, virtualStorage, relativePath, storage, repositoryID)
require.NoError(t, err)
}
}
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index a74a668f1..d40b7faf4 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -1074,19 +1074,21 @@ func TestReconciler(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, configuredStorages)
for virtualStorage, relativePaths := range tc.repositories {
for relativePath, storages := range relativePaths {
+ var repositoryID int64
repoCreated := false
for storage, repo := range storages {
if repo.generation >= 0 {
if !repoCreated {
repoCreated = true
- id, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
+ var err error
+ repositoryID, err = rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
require.NoError(t, err)
- require.NoError(t, rs.CreateRepository(ctx, id, virtualStorage, relativePath, storage, nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, repositoryID, virtualStorage, relativePath, storage, nil, nil, false, false))
}
- require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation))
+ require.NoError(t, rs.SetGeneration(ctx, repositoryID, storage, repo.generation))
}
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 0436cf431..ca07c2bce 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -71,7 +71,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
logWithCorrID: correlation.ExtractFromContext(ctx),
})
- generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
+ generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
if err != nil {
// Later generation might have already been replicated by an earlier replication job. If that's the case,
// we'll simply acknowledge the job. This also prevents accidental downgrades from happening.
@@ -96,11 +96,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
Repository: targetRepository,
}); err != nil {
if errors.Is(err, repository.ErrInvalidSourceRepository) {
- if err := dr.rs.DeleteInvalidRepository(ctx,
- event.Job.VirtualStorage,
- event.Job.RelativePath,
- event.Job.SourceNodeStorage,
- ); err != nil {
+ if err := dr.rs.DeleteInvalidRepository(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage); err != nil {
return fmt.Errorf("delete invalid repository: %w", err)
}
@@ -137,8 +133,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
if generation != datastore.GenerationUnknown {
return dr.rs.SetGeneration(ctx,
- event.Job.VirtualStorage,
- event.Job.RelativePath,
+ event.Job.RepositoryID,
event.Job.TargetNodeStorage,
generation,
)
diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go
index a1eccbed3..557977661 100644
--- a/internal/praefect/replicator_pg_test.go
+++ b/internal/praefect/replicator_pg_test.go
@@ -51,11 +51,17 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
defer testhelper.MustClose(t, targetCC)
rs := datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil)
- require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-1", 0))
+
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "gitaly-1", nil, nil, true, false))
+
+ exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1")
+ require.NoError(t, err)
+ require.True(t, exists)
r := &defaultReplicator{rs: rs, log: testhelper.DiscardTestLogger(t)}
require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
+ RepositoryID: 1,
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
SourceNodeStorage: "gitaly-1",
@@ -63,7 +69,7 @@ func TestReplicatorInvalidSourceRepository(t *testing.T) {
},
}, nil, targetCC))
- exists, err := rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1")
+ exists, err = rs.RepositoryExists(ctx, "virtual-storage-1", "relative-path-1")
require.NoError(t, err)
require.False(t, exists)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 35646c33a..b876c109a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -237,15 +237,13 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
returnedErr := datastore.DowngradeAttemptedError{
- VirtualStorage: "virtual-storage-1",
- RelativePath: "relative-path-1",
Storage: "gitaly-2",
CurrentGeneration: 1,
AttemptedGeneration: tc.attemptedGeneration,
}
rs := datastore.MockRepositoryStore{
- GetReplicatedGenerationFunc: func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) {
+ GetReplicatedGenerationFunc: func(ctx context.Context, repositoryID int64, source, target string) (int, error) {
return 0, returnedErr
},
}
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index f26a127a0..d1f4242fe 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -225,7 +225,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
[]string{"consistent-secondary", "unhealthy-secondary", "inconsistent-secondary"}, nil, true, true),
)
require.NoError(t,
- rs.IncrementGeneration(ctx, "virtual-storage-1", "repository", "primary", []string{"consistent-secondary", "unhealthy-secondary"}),
+ rs.IncrementGeneration(ctx, repositoryID, "primary", []string{"consistent-secondary", "unhealthy-secondary"}),
)
router := NewPerRepositoryRouter(
@@ -369,7 +369,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
)
if len(tc.consistentStorages) > 0 {
- require.NoError(t, rs.IncrementGeneration(ctx, "virtual-storage-1", "repository", tc.consistentStorages[0], tc.consistentStorages[1:]))
+ require.NoError(t, rs.IncrementGeneration(ctx, repositoryID, tc.consistentStorages[0], tc.consistentStorages[1:]))
}
for virtualStorage, relativePaths := range tc.assignedNodes {