Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-05-12 16:49:44 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-05-18 18:46:59 +0300
commitffc3443f89eb37ae132289601971d71f0346e411 (patch)
treee639983f971a027322fd648d48a3bbc7503ca1d5 /internal/praefect/datastore/queue_test.go
parent47b4f195006d0755f76767d2dfe4f89ef2cb9abe (diff)
Praefect: horizontal scaling of a single shard MVC
Random distribution of reads to up to date gitaly nodes. Up to date status of node verified based on the state of the replication queue. Backoff strategy is to use primary node if there are no up to date secondaries or an error occurred. This feature can be enabled with 'distribution_of_reads_enabled'. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2650
Diffstat (limited to 'internal/praefect/datastore/queue_test.go')
-rw-r--r--internal/praefect/datastore/queue_test.go203
1 files changed, 203 insertions, 0 deletions
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 62e969a21..75bbe608a 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -572,6 +572,209 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ source := PostgresReplicationEventQueue{qc: db}
+
+ t.Run("single 'ready' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'dead' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'failed' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'completed' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("multiple 'completed' jobs for single storage but different repos", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("last jobs are 'completed' for multiple storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1", "s2"}, ss)
+ })
+
+ t.Run("last jobs are 'completed' for multiple storages but different virtuals", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs2", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("lasts are in 'completed' and 'in_progress' for different storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("lasts are in 'dead', 'ready', 'failed' and 'in_progress' for different storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("last is not 'completed'", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'ready'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("multiple virtuals with multiple storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
+
+ ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
+ ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s5", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s2"}, ss)
+ })
+}
+
func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
t.Helper()