diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-14 12:48:21 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-14 15:16:38 +0300 |
commit | 57f41077be5d57a8c09e9adfa4dbb706fe71a397 (patch) | |
tree | 2b5f9030a95b0a0497403c4d355620acc54afb58 | |
parent | 7b1cbb50419dbf364127767dddd1ac62ce613ce2 (diff) |
Praefect: primary election must support multi virtual storages
Election of the new primary node must take into account
relation of the node to the proper virtual storage.
To speed up SQL operations index were introduced based on
'virtual_storage' and 'target_node_storage' values.
As an addition down migration was fixed to remove new type.
5 files changed, 48 insertions, 22 deletions
diff --git a/_support/Makefile.template b/_support/Makefile.template index 81557bf93..c9f6e2339 100644 --- a/_support/Makefile.template +++ b/_support/Makefile.template @@ -13,7 +13,7 @@ ASSEMBLY_ROOT ?= {{ .BuildDir }}/assembly BUILD_TAGS := tracer_static tracer_static_jaeger continuous_profiler_stackdriver SHELL = /usr/bin/env bash -eo pipefail # These variables control test options and artifacts -TEST_OPTIONS ?= +TEST_OPTIONS ?= TEST_REPORT_DIR ?= {{ .BuildDir }}/reports TEST_OUTPUT ?= $(TEST_REPORT_DIR)/go-tests-output-$(CI_JOB_NAME).txt TEST_REPORT ?= $(TEST_REPORT_DIR)/go-tests-report-$(CI_JOB_NAME).xml diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go index cab5a5494..2d0b6728a 100644 --- a/internal/praefect/datastore/migrations/20200224220728_job_queue.go +++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go @@ -32,6 +32,7 @@ func init() { `DROP TABLE IF EXISTS replication_queue_job_lock CASCADE`, `DROP TABLE IF EXISTS replication_queue CASCADE`, `DROP TABLE IF EXISTS replication_queue_lock CASCADE`, + `DROP TYPE IF EXISTS REPLICATION_JOB_STATE`, }, } diff --git a/internal/praefect/datastore/migrations/20200512131219_replication_job_indexing.go b/internal/praefect/datastore/migrations/20200512131219_replication_job_indexing.go new file mode 100644 index 000000000..57c9b8ce1 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200512131219_replication_job_indexing.go @@ -0,0 +1,18 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200512131219_replication_job_indexing", + Up: []string{ + `CREATE INDEX IF NOT EXISTS virtual_target_on_replication_queue_idx + ON replication_queue USING BTREE ((job->>'virtual_storage'), (job->>'target_node_storage'))`, + }, + Down: []string{ + `DROP INDEX IF EXISTS virtual_target_on_replication_queue_idx`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go index 0b58cceb6..e84decbc8 100644 --- a/internal/praefect/nodes/sql_elector.go +++ b/internal/praefect/nodes/sql_elector.go @@ -377,14 +377,15 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error { job->>'target_node_storage' AS target_node_storage, MAX(updated_at) AS updated_at FROM replication_queue - WHERE state = 'completed' AND job->>'target_node_storage' = ANY ($1) + WHERE state = 'completed' AND job->>'target_node_storage' = ANY ($1) AND job->>'virtual_storage' = $2 GROUP BY job->>'target_node_storage' ) latest ON rq.job->>'target_node_storage' = latest.target_node_storage AND rq.updated_at >= latest.updated_at + WHERE rq.job->>'virtual_storage' = $2 ) AS t GROUP BY target_node_storage ORDER BY SUM(ready+in_progress+2*failed+2*dead)` - rows, err := s.db.Query(q, pq.Array(candidateStorages)) + rows, err := s.db.Query(q, pq.Array(candidateStorages), s.shardName) if err != nil { return fmt.Errorf("executing query for ordering candidate nodes: %w", err) } diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index 77c6d3833..2b73c46dd 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -216,17 +216,20 @@ func TestElectNewPrimary(t *testing.T) { initialReplQueueInsert: `INSERT INTO replication_queue (job, updated_at, state) VALUES - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:04', 'dead'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:03', 'completed'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:02', 'completed'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:01', 'completed'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:00', 'completed'), - - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:04', 'completed'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:03', 'dead'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:02', 'dead'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:01', 'dead'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:00', 'dead')`, + ('{"virtual_storage": "test-shard-1", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:05', 'ready'), + ('{"virtual_storage": "test-shard-1", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:05', 'completed'), + + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:04', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:03', 'completed'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:02', 'completed'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:01', 'completed'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:04', 'completed'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:03', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:02', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:01', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:00', 'dead')`, expectedPrimary: "gitaly-2", incompleteCounts: []targetNodeIncompleteCounts{ { @@ -250,14 +253,17 @@ func TestElectNewPrimary(t *testing.T) { initialReplQueueInsert: `INSERT INTO replication_queue (job, updated_at, state) VALUES - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:02', 'dead'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:01', 'dead'), - ('{"target_node_storage": "gitaly-1"}', '2020-01-01 00:00:00', 'completed'), - - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:03', 'ready'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:02', 'in_progress'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:01', 'ready'), - ('{"target_node_storage": "gitaly-2"}', '2020-01-01 00:00:00', 'completed')`, + ('{"virtual_storage": "test-shard-1", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:05', 'in_progress'), + ('{"virtual_storage": "test-shard-1", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:05', 'completed'), + + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:02', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:01', 'dead'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:03', 'ready'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:02', 'in_progress'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:01', 'ready'), + ('{"virtual_storage": "test-shard-0", "target_node_storage": "gitaly-2"}', '2020-01-01 00:00:00', 'completed')`, expectedPrimary: "gitaly-2", incompleteCounts: []targetNodeIncompleteCounts{ { |