Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-06-04 18:35:06 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-06-04 18:48:46 +0300
commit7edbb7d6c6be568d677d386ba92a1615aad22750 (patch)
tree11a302a1b3361f6655842c047be5184adfb721fd
parentbf7d70d8daa5d41d6a747b059ff6ec66e1c4c154 (diff)
get outdated repositoriessmh-up-to-date-node
-rw-r--r--internal/praefect/datastore/memory.go4
-rw-r--r--internal/praefect/datastore/queue.go105
-rw-r--r--internal/praefect/datastore/queue_test.go341
3 files changed, 450 insertions, 0 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 3a095c9c8..a311e4dae 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -60,6 +60,10 @@ func (s *memoryReplicationEventQueue) nextID() uint64 {
return s.seq
}
+func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) {
+ return nil, errors.New("unimplemented")
+}
+
func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event ReplicationEvent) (ReplicationEvent, error) {
event.Attempt = 3
event.State = JobStateReady
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 1785cc107..671df03f6 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -6,9 +6,11 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
+ "log"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/lib/pq"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -25,6 +27,9 @@ type ReplicationEventQueue interface {
// CountDeadReplicationJobs returns the dead replication job counts by relative path within the
// given timerange. The timerange beginning is inclusive and ending is exclusive.
CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
+ // GetOutdatedRepositories returns repositories with storages which have uncompleted replication jobs from the
+ // reference storage.
+ GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error)
// GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
// It returns no results if there is no up to date storages or there were no replication events yet.
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
@@ -345,6 +350,106 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return acknowledged.Values(), nil
}
+// GetOutdatedRepositories gets repositories which have not replicated all writes from reference node.
+func (rq PostgresReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage, reference string) (map[string][]string, error) {
+ // This query searches the replication graph for a path from a repository to the repository on the reference storage.
+ // If it does not find a path to the reference storage, the repository's copy on the storage is reported as outdated.
+ //
+ // The path is constructed by following the chain of replication jobs. `latest_job` CTE queries for the latest
+ // 'meaningful' jobs for each repository and storage. Job is considered meaningful in two cases:
+ // 1. Jobs which originate from the reference are meaningful as if they are 'completed', they indicate the
+ // repository is up to date on the node. If they are in any other state, the job indicates the repository
+ // is missing some writes from the reference storage.
+ // 2. Jobs which originate from other nodes than the reference are only considered meaningful if they are
+ // in 'completed' state. If the job is 'completed', then the repository contains the latest changes from
+ // the given non-reference node and we have to then follow the chain to verify the non-reference node is
+ // up to date with the reference. In other states, the node does not contain the non-reference node's
+ // changes and the job does not give any information on whether the currently checked node is missing
+ // writes from the reference.
+ //
+ // Taking the meaningful jobs as starting points, `intermediate_job` CTE then follows the replication job chain
+ // until it finds the path to the reference storage or runs in to a cycle. The next job to follow in the chain
+ // is decided using the same meaningful job criteria as in `latest_job` CTE.
+ //
+ // As `intermediate_job` returns every path segment, `full_path` CTE finally selects the full (longest) path for
+ // each repository and node pair.
+ //
+ // Finally we select the repository/node pairs which do not have reach the reference repository or reach it but
+ // did not successfully complete the replication job from it.
+ const q = `
+WITH RECURSIVE latest_job AS (
+ SELECT DISTINCT ON (repository, target)
+ job->>'relative_path' AS repository,
+ job->>'target_node_storage' AS target,
+ job->>'source_node_storage' AS source,
+ state
+ FROM replication_queue
+ WHERE
+ job->>'virtual_storage' = $1 AND
+ (job->>'source_node_storage' = $2 OR
+ job->>'source_node_storage' != $2 AND state = 'completed')
+ ORDER BY repository, target, updated_at DESC NULLS FIRST
+), intermediate_job AS (
+ SELECT
+ repository,
+ target,
+ state,
+ ARRAY[source] AS path
+ FROM latest_job
+ UNION
+ SELECT
+ intermediate_job.repository,
+ intermediate_job.target,
+ next_job.state,
+ intermediate_job.path || (job->>'source_node_storage')
+ FROM replication_queue AS next_job, intermediate_job
+ WHERE
+ job->>'source_node_storage' != ALL(intermediate_job.path) AND
+ job->>'virtual_storage' = $1 AND
+ job->>'relative_path' = intermediate_job.repository AND
+ job->>'target_node_storage' = intermediate_job.path[array_length(intermediate_job.path, 1)] AND
+ (
+ job->>'source_node_storage' = $2 OR
+ job->>'source_node_storage' != $2 AND next_job.state = 'completed'
+ )
+), full_path AS (
+ SELECT DISTINCT ON (repository, target)
+ repository,
+ target,
+ state,
+ path
+ FROM intermediate_job
+ ORDER BY repository, target, array_length(path, 1) DESC
+)
+
+SELECT repository, target, state, path
+FROM full_path
+WHERE state != 'completed' OR path[array_length(path, 1)] != $2
+ORDER BY repository, target
+`
+
+ rows, err := rq.qc.QueryContext(ctx, q, virtualStorage, reference)
+ if err != nil {
+ return nil, err
+ }
+
+ repos := map[string][]string{}
+ for rows.Next() {
+ var repo, node, state string
+ var path []string
+ if err := rows.Scan(&repo, &node, &state, (*pq.StringArray)(&path)); err != nil {
+ return nil, err
+ }
+
+ repos[repo] = append(repos[repo], node)
+ log.Printf("repo: %s, node: %s, state: %s, path: %s", repo, node, state, path)
+ }
+
+ defer rows.Close()
+
+ return repos, rows.Err()
+}
+
func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) {
query := `
SELECT storage
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 75bbe608a..f7f7737f6 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -572,6 +572,347 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) {
+ const (
+ virtualStorage = "test-virtual-storage"
+ oldPrimary = "old-primary"
+ newPrimary = "new-primary"
+ secondary = "secondary"
+ )
+
+ now := time.Now()
+ offset := func(d time.Duration) *time.Time {
+ t := now.Add(d)
+ return &t
+ }
+
+ for _, tc := range []struct {
+ desc string
+ events []ReplicationEvent
+ error error
+ expected map[string][]string
+ }{
+ {
+ desc: "basic scenarios work",
+ events: []ReplicationEvent{
+ {
+ State: JobStateReady,
+ Job: ReplicationJob{
+ VirtualStorage: "wrong-virtual-storage",
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateDead,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: "wrong-source-node",
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "completed-job-ignored",
+ },
+ },
+ {
+ State: JobStateDead,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateInProgress,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-2",
+ },
+ },
+ {
+ State: JobStateFailed,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: secondary,
+ RelativePath: "repo-2",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": {newPrimary},
+ "repo-2": {newPrimary, secondary},
+ },
+ },
+ {
+ desc: "search considers null updated_at as latest",
+ events: []ReplicationEvent{
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateReady,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": []string{newPrimary},
+ },
+ },
+ {
+ // Search should terminate at the reference storage and not consider
+ // earlier nodes.
+ desc: "search terminates terminates at the reference storage",
+ events: []ReplicationEvent{
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: "even-older-primary",
+ TargetNodeStorage: oldPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(2 * time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{},
+ },
+ {
+ // Incomplete jobs from other nodes than the previous writable
+ // primary do not indicate inconsistent state and should be
+ // ignored.
+ desc: "up to date with failed jobs from secondaries",
+ events: []ReplicationEvent{
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: secondary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{},
+ },
+ {
+ // Node that experienced data loss on failover but was later
+ // reconciled from the previous writable primary should
+ // contain complete data.
+ desc: "up to date with earlier failed job from old primary",
+ events: []ReplicationEvent{
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{},
+ },
+ {
+ // If the node was reconciled from an outdated secondary,
+ // it will contain the same outdated data as the secondary.
+ // The node should be considered outdated as well.
+ desc: "reconciled from outdated secondary",
+ events: []ReplicationEvent{
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: secondary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(2 * time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: secondary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": {newPrimary, secondary},
+ },
+ },
+ {
+ // If the node is reconciled from an up to date secondary,
+ // the node contains the latest data as well.
+ desc: "reconciled from up to date secondary",
+ events: []ReplicationEvent{
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: secondary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(2 * time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: secondary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{},
+ },
+ {
+ // Reconciling two nodes back and forth causes a cycle. We should report the
+ // repositories as out of date as there is no way to construct a path back to
+ // the previous writable primary. This prevents us from verifying whether the
+ // nodes are up to date with the previous writable primary.
+ //
+ // Reconciling towards the previous writable primary breaks the precondition for
+ // the search that the previous writable primary contains the full data and has
+ // not received writes after failing over. Results from such state are not accurate.
+ desc: "two way cycle reports out of date",
+ events: []ReplicationEvent{
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: secondary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: newPrimary,
+ TargetNodeStorage: secondary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": []string{newPrimary, secondary},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ for _, event := range tc.events {
+ db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)",
+ event.State, event.UpdatedAt, event.Job,
+ )
+ }
+
+ actual, err := NewPostgresReplicationEventQueue(db).GetOutdatedRepositories(ctx, virtualStorage, oldPrimary)
+ require.NoError(t, err)
+ require.Equal(t, tc.expected, actual)
+ })
+ }
+}
+
func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
db := getDB(t)