Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-19 08:58:37 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-19 08:58:37 +0300
commit7788622901b8eaf3eecaad6685217869d8519c8e (patch)
tree65a5623591c9a8e5901f0e3639dec88abf1d68d9
parentec5dd7b3c441c744e4b74a083deec1f41435cb71 (diff)
parent2df119b5cbff61374f4f530e4707c2cdb3c5fb3a (diff)
Merge branch 'smh-fix-reconciliation-to-assigned' into 'master'
Reconcile missing repositories to assigned nodes See merge request gitlab-org/gitaly!3153
-rw-r--r--changelogs/unreleased/smh-fix-reconciliation-to-assigned.yml5
-rw-r--r--internal/praefect/reconciler/reconciler.go85
-rw-r--r--internal/praefect/reconciler/reconciler_test.go37
3 files changed, 88 insertions, 39 deletions
diff --git a/changelogs/unreleased/smh-fix-reconciliation-to-assigned.yml b/changelogs/unreleased/smh-fix-reconciliation-to-assigned.yml
new file mode 100644
index 000000000..1f1525505
--- /dev/null
+++ b/changelogs/unreleased/smh-fix-reconciliation-to-assigned.yml
@@ -0,0 +1,5 @@
+---
+title: Reconcile missing repositories to assigned nodes
+merge_request: 3153
+author:
+type: fixed
diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go
index f5048c74b..1ce8c1c11 100644
--- a/internal/praefect/reconciler/reconciler.go
+++ b/internal/praefect/reconciler/reconciler.go
@@ -144,55 +144,68 @@ func (r *Reconciler) reconcile(ctx context.Context) error {
WITH healthy_storages AS (
SELECT unnest($1::text[]) AS virtual_storage,
unnest($2::text[]) AS storage
-), reconciliation_jobs AS (
+),
+
+update_jobs AS (
+ SELECT DISTINCT ON (virtual_storage, relative_path, target_node_storage)
+ virtual_storage,
+ relative_path,
+ source_node_storage,
+ target_node_storage
+ FROM (
+ SELECT virtual_storage, relative_path, storage AS target_node_storage
+ FROM repositories
+ JOIN healthy_storages USING (virtual_storage)
+ LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
+ WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
+ AND (
+ -- If assignments exist for the repository, only the assigned storages are targeted for replication.
+ -- If no assignments exist, every healthy node is targeted for replication.
+ SELECT COUNT(storage) = 0 OR COUNT(storage) FILTER (WHERE storage = healthy_storages.storage) = 1
+ FROM repository_assignments
+ WHERE virtual_storage = repositories.virtual_storage
+ AND relative_path = repositories.relative_path
+ )
+ ORDER BY virtual_storage, relative_path
+ ) AS unhealthy_repositories
+ JOIN (
+ SELECT virtual_storage, relative_path, storage AS source_node_storage
+ FROM storage_repositories
+ JOIN healthy_storages USING (virtual_storage, storage)
+ JOIN repositories USING (virtual_storage, relative_path, generation)
+ ORDER BY virtual_storage, relative_path
+ ) AS healthy_repositories USING (virtual_storage, relative_path)
+ WHERE NOT EXISTS (
+ SELECT true
+ FROM replication_queue
+ WHERE state IN ('ready', 'in_progress', 'failed')
+ AND job->>'change' = 'update'
+ AND job->>'virtual_storage' = virtual_storage
+ AND job->>'relative_path' = relative_path
+ AND job->>'target_node_storage' = target_node_storage
+ )
+ ORDER BY virtual_storage, relative_path, target_node_storage, random()
+),
+
+reconciliation_jobs AS (
INSERT INTO replication_queue (lock_id, job, meta)
SELECT
(virtual_storage || '|' || target_node_storage || '|' || relative_path),
to_jsonb(reconciliation_jobs),
jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64'))
FROM (
- SELECT DISTINCT ON (virtual_storage, relative_path, target_node_storage)
+ SELECT
virtual_storage,
relative_path,
source_node_storage,
target_node_storage,
'update' AS change
- FROM (
- SELECT virtual_storage, relative_path, storage AS target_node_storage
- FROM repositories
- JOIN healthy_storages USING (virtual_storage)
- LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
- WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
- AND (
- -- If assignments exist for the repository, only the assigned storages are targeted for replication.
- -- If no assignments exist, every healthy node is targeted for replication.
- SELECT COUNT(storage) = 0 OR COUNT(storage) FILTER (WHERE storage = storage_repositories.storage) = 1
- FROM repository_assignments
- WHERE virtual_storage = repositories.virtual_storage
- AND relative_path = repositories.relative_path
- )
- ORDER BY virtual_storage, relative_path
- ) AS unhealthy_repositories
- JOIN (
- SELECT virtual_storage, relative_path, storage AS source_node_storage
- FROM storage_repositories
- JOIN healthy_storages USING (virtual_storage, storage)
- JOIN repositories USING (virtual_storage, relative_path, generation)
- ORDER BY virtual_storage, relative_path
- ) AS healthy_repositories USING (virtual_storage, relative_path)
- WHERE NOT EXISTS (
- SELECT true
- FROM replication_queue
- WHERE state IN ('ready', 'in_progress', 'failed')
- AND job->>'change' = 'update'
- AND job->>'virtual_storage' = virtual_storage
- AND job->>'relative_path' = relative_path
- AND job->>'target_node_storage' = target_node_storage
- )
- ORDER BY virtual_storage, relative_path, target_node_storage, random()
+ FROM update_jobs
) AS reconciliation_jobs
RETURNING lock_id, meta, job
-), locks AS (
+),
+
+create_locks AS (
INSERT INTO replication_queue_lock(id)
SELECT lock_id
FROM reconciliation_jobs
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index ba79fe9c5..c1cfba7a9 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -356,7 +356,7 @@ func TestReconciler(t *testing.T) {
"virtual-storage-1": {
"relative-path-1": {
"storage-1": {generation: 1},
- "storage-2": {generation: 0},
+ "storage-2": {generation: -1, assigned: true},
"storage-3": {generation: 0, assigned: true},
},
// assert query correctly scopes for relative path
@@ -381,6 +381,13 @@ func TestReconciler(t *testing.T) {
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
SourceNodeStorage: "storage-1",
+ TargetNodeStorage: "storage-2",
+ },
+ {
+ Change: datastore.UpdateRepo,
+ VirtualStorage: "virtual-storage-1",
+ RelativePath: "relative-path-1",
+ SourceNodeStorage: "storage-1",
TargetNodeStorage: "storage-3",
},
},
@@ -392,7 +399,7 @@ func TestReconciler(t *testing.T) {
"virtual-storage-1": {
"relative-path-1": {
"storage-1": {generation: 1, assigned: true},
- "storage-2": {generation: 0},
+ "storage-2": {generation: -1, assigned: true},
"storage-3": {generation: 0, assigned: true},
},
},
@@ -403,11 +410,31 @@ func TestReconciler(t *testing.T) {
VirtualStorage: "virtual-storage-1",
RelativePath: "relative-path-1",
SourceNodeStorage: "storage-1",
+ TargetNodeStorage: "storage-2",
+ },
+ {
+ Change: datastore.UpdateRepo,
+ VirtualStorage: "virtual-storage-1",
+ RelativePath: "relative-path-1",
+ SourceNodeStorage: "storage-1",
TargetNodeStorage: "storage-3",
},
},
},
{
+ desc: "unassigned nodes are not targeted",
+ healthyStorages: configuredStorages,
+ repositories: repositories{
+ "virtual-storage-1": {
+ "relative-path-1": {
+ "storage-1": {generation: 2, assigned: true},
+ "storage-2": {generation: -1, assigned: false},
+ "storage-3": {generation: 0, assigned: false},
+ },
+ },
+ },
+ },
+ {
desc: "the only assigned node being up to date produces no jobs",
healthyStorages: configuredStorages,
repositories: repositories{
@@ -430,8 +457,12 @@ func TestReconciler(t *testing.T) {
for virtualStorage, relativePaths := range tc.repositories {
for relativePath, storages := range relativePaths {
for storage, repo := range storages {
- require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation))
+ if repo.generation >= 0 {
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation))
+ }
+ }
+ for storage, repo := range storages {
if repo.assigned {
_, err := db.ExecContext(ctx, `
INSERT INTO repository_assignments VALUES ($1, $2, $3)