diff options
Diffstat (limited to 'internal/praefect/datastore/queue_test.go')
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 341 |
1 files changed, 341 insertions, 0 deletions
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 75bbe608a..f7f7737f6 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -572,6 +572,347 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }) } +func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) { + const ( + virtualStorage = "test-virtual-storage" + oldPrimary = "old-primary" + newPrimary = "new-primary" + secondary = "secondary" + ) + + now := time.Now() + offset := func(d time.Duration) *time.Time { + t := now.Add(d) + return &t + } + + for _, tc := range []struct { + desc string + events []ReplicationEvent + error error + expected map[string][]string + }{ + { + desc: "basic scenarios work", + events: []ReplicationEvent{ + { + State: JobStateReady, + Job: ReplicationJob{ + VirtualStorage: "wrong-virtual-storage", + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: "wrong-source-node", + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "completed-job-ignored", + }, + }, + { + State: JobStateDead, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateInProgress, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-2", + }, + }, + { + State: JobStateFailed, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-2", + }, + }, + }, + expected: map[string][]string{ + "repo-1": {newPrimary}, + "repo-2": {newPrimary, secondary}, + }, + }, + { + desc: "search considers null updated_at as latest", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateReady, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": []string{newPrimary}, + }, + }, + { + // Search should terminate at the reference storage and not consider + // earlier nodes. + desc: "search terminates terminates at the reference storage", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: "even-older-primary", + TargetNodeStorage: oldPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Incomplete jobs from other nodes than the previous writable + // primary do not indicate inconsistent state and should be + // ignored. + desc: "up to date with failed jobs from secondaries", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Node that experienced data loss on failover but was later + // reconciled from the previous writable primary should + // contain complete data. + desc: "up to date with earlier failed job from old primary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // If the node was reconciled from an outdated secondary, + // it will contain the same outdated data as the secondary. + // The node should be considered outdated as well. + desc: "reconciled from outdated secondary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": {newPrimary, secondary}, + }, + }, + { + // If the node is reconciled from an up to date secondary, + // the node contains the latest data as well. + desc: "reconciled from up to date secondary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Reconciling two nodes back and forth causes a cycle. We should report the + // repositories as out of date as there is no way to construct a path back to + // the previous writable primary. This prevents us from verifying whether the + // nodes are up to date with the previous writable primary. + // + // Reconciling towards the previous writable primary breaks the precondition for + // the search that the previous writable primary contains the full data and has + // not received writes after failing over. Results from such state are not accurate. + desc: "two way cycle reports out of date", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: newPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": []string{newPrimary, secondary}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + for _, event := range tc.events { + db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)", + event.State, event.UpdatedAt, event.Job, + ) + } + + actual, err := NewPostgresReplicationEventQueue(db).GetOutdatedRepositories(ctx, virtualStorage, oldPrimary) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } +} + func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) { db := getDB(t) |