diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-06-04 18:35:06 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-06-04 18:48:46 +0300 |
commit | 7edbb7d6c6be568d677d386ba92a1615aad22750 (patch) | |
tree | 11a302a1b3361f6655842c047be5184adfb721fd | |
parent | bf7d70d8daa5d41d6a747b059ff6ec66e1c4c154 (diff) |
get outdated repositoriessmh-up-to-date-node
-rw-r--r-- | internal/praefect/datastore/memory.go | 4 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 105 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 341 |
3 files changed, 450 insertions, 0 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 3a095c9c8..a311e4dae 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -60,6 +60,10 @@ func (s *memoryReplicationEventQueue) nextID() uint64 { return s.seq } +func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) { + return nil, errors.New("unimplemented") +} + func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event ReplicationEvent) (ReplicationEvent, error) { event.Attempt = 3 event.State = JobStateReady diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 1785cc107..671df03f6 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -6,9 +6,11 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "log" "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/lib/pq" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" ) @@ -25,6 +27,9 @@ type ReplicationEventQueue interface { // CountDeadReplicationJobs returns the dead replication job counts by relative path within the // given timerange. The timerange beginning is inclusive and ending is exclusive. CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) + // GetOutdatedRepositories returns repositories with storages which have uncompleted replication jobs from the + // reference storage. + GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) // GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state. // It returns no results if there is no up to date storages or there were no replication events yet. GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) @@ -345,6 +350,106 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J return acknowledged.Values(), nil } +// GetOutdatedRepositories gets repositories which have not replicated all writes from reference node. +func (rq PostgresReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage, reference string) (map[string][]string, error) { + // This query searches the replication graph for a path from a repository to the repository on the reference storage. + // If it does not find a path to the reference storage, the repository's copy on the storage is reported as outdated. + // + // The path is constructed by following the chain of replication jobs. `latest_job` CTE queries for the latest + // 'meaningful' jobs for each repository and storage. Job is considered meaningful in two cases: + // 1. Jobs which originate from the reference are meaningful as if they are 'completed', they indicate the + // repository is up to date on the node. If they are in any other state, the job indicates the repository + // is missing some writes from the reference storage. + // 2. Jobs which originate from other nodes than the reference are only considered meaningful if they are + // in 'completed' state. If the job is 'completed', then the repository contains the latest changes from + // the given non-reference node and we have to then follow the chain to verify the non-reference node is + // up to date with the reference. In other states, the node does not contain the non-reference node's + // changes and the job does not give any information on whether the currently checked node is missing + // writes from the reference. + // + // Taking the meaningful jobs as starting points, `intermediate_job` CTE then follows the replication job chain + // until it finds the path to the reference storage or runs in to a cycle. The next job to follow in the chain + // is decided using the same meaningful job criteria as in `latest_job` CTE. + // + // As `intermediate_job` returns every path segment, `full_path` CTE finally selects the full (longest) path for + // each repository and node pair. + // + // Finally we select the repository/node pairs which do not have reach the reference repository or reach it but + // did not successfully complete the replication job from it. + const q = ` +WITH RECURSIVE latest_job AS ( + SELECT DISTINCT ON (repository, target) + job->>'relative_path' AS repository, + job->>'target_node_storage' AS target, + job->>'source_node_storage' AS source, + state + FROM replication_queue + WHERE + job->>'virtual_storage' = $1 AND + (job->>'source_node_storage' = $2 OR + job->>'source_node_storage' != $2 AND state = 'completed') + ORDER BY repository, target, updated_at DESC NULLS FIRST +), intermediate_job AS ( + SELECT + repository, + target, + state, + ARRAY[source] AS path + FROM latest_job + UNION + SELECT + intermediate_job.repository, + intermediate_job.target, + next_job.state, + intermediate_job.path || (job->>'source_node_storage') + FROM replication_queue AS next_job, intermediate_job + WHERE + job->>'source_node_storage' != ALL(intermediate_job.path) AND + job->>'virtual_storage' = $1 AND + job->>'relative_path' = intermediate_job.repository AND + job->>'target_node_storage' = intermediate_job.path[array_length(intermediate_job.path, 1)] AND + ( + job->>'source_node_storage' = $2 OR + job->>'source_node_storage' != $2 AND next_job.state = 'completed' + ) +), full_path AS ( + SELECT DISTINCT ON (repository, target) + repository, + target, + state, + path + FROM intermediate_job + ORDER BY repository, target, array_length(path, 1) DESC +) + +SELECT repository, target, state, path +FROM full_path +WHERE state != 'completed' OR path[array_length(path, 1)] != $2 +ORDER BY repository, target +` + + rows, err := rq.qc.QueryContext(ctx, q, virtualStorage, reference) + if err != nil { + return nil, err + } + + repos := map[string][]string{} + for rows.Next() { + var repo, node, state string + var path []string + if err := rows.Scan(&repo, &node, &state, (*pq.StringArray)(&path)); err != nil { + return nil, err + } + + repos[repo] = append(repos[repo], node) + log.Printf("repo: %s, node: %s, state: %s, path: %s", repo, node, state, path) + } + + defer rows.Close() + + return repos, rows.Err() +} + func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) { query := ` SELECT storage diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 75bbe608a..f7f7737f6 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -572,6 +572,347 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }) } +func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) { + const ( + virtualStorage = "test-virtual-storage" + oldPrimary = "old-primary" + newPrimary = "new-primary" + secondary = "secondary" + ) + + now := time.Now() + offset := func(d time.Duration) *time.Time { + t := now.Add(d) + return &t + } + + for _, tc := range []struct { + desc string + events []ReplicationEvent + error error + expected map[string][]string + }{ + { + desc: "basic scenarios work", + events: []ReplicationEvent{ + { + State: JobStateReady, + Job: ReplicationJob{ + VirtualStorage: "wrong-virtual-storage", + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: "wrong-source-node", + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "completed-job-ignored", + }, + }, + { + State: JobStateDead, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateInProgress, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-2", + }, + }, + { + State: JobStateFailed, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-2", + }, + }, + }, + expected: map[string][]string{ + "repo-1": {newPrimary}, + "repo-2": {newPrimary, secondary}, + }, + }, + { + desc: "search considers null updated_at as latest", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateReady, + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": []string{newPrimary}, + }, + }, + { + // Search should terminate at the reference storage and not consider + // earlier nodes. + desc: "search terminates terminates at the reference storage", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: "even-older-primary", + TargetNodeStorage: oldPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Incomplete jobs from other nodes than the previous writable + // primary do not indicate inconsistent state and should be + // ignored. + desc: "up to date with failed jobs from secondaries", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Node that experienced data loss on failover but was later + // reconciled from the previous writable primary should + // contain complete data. + desc: "up to date with earlier failed job from old primary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // If the node was reconciled from an outdated secondary, + // it will contain the same outdated data as the secondary. + // The node should be considered outdated as well. + desc: "reconciled from outdated secondary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateDead, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": {newPrimary, secondary}, + }, + }, + { + // If the node is reconciled from an up to date secondary, + // the node contains the latest data as well. + desc: "reconciled from up to date secondary", + events: []ReplicationEvent{ + { + State: JobStateDead, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: oldPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(2 * time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{}, + }, + { + // Reconciling two nodes back and forth causes a cycle. We should report the + // repositories as out of date as there is no way to construct a path back to + // the previous writable primary. This prevents us from verifying whether the + // nodes are up to date with the previous writable primary. + // + // Reconciling towards the previous writable primary breaks the precondition for + // the search that the previous writable primary contains the full data and has + // not received writes after failing over. Results from such state are not accurate. + desc: "two way cycle reports out of date", + events: []ReplicationEvent{ + { + State: JobStateCompleted, + UpdatedAt: offset(0), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: secondary, + TargetNodeStorage: newPrimary, + RelativePath: "repo-1", + }, + }, + { + State: JobStateCompleted, + UpdatedAt: offset(time.Second), + Job: ReplicationJob{ + VirtualStorage: virtualStorage, + SourceNodeStorage: newPrimary, + TargetNodeStorage: secondary, + RelativePath: "repo-1", + }, + }, + }, + expected: map[string][]string{ + "repo-1": []string{newPrimary, secondary}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + db := getDB(t) + + for _, event := range tc.events { + db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)", + event.State, event.UpdatedAt, event.Job, + ) + } + + actual, err := NewPostgresReplicationEventQueue(db).GetOutdatedRepositories(ctx, virtualStorage, oldPrimary) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } +} + func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) { db := getDB(t) |