Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-06-09 13:51:51 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-06-11 13:20:44 +0300
commit65dc3bf29aa0d0231842fb907607e4b806800213 (patch)
treed9e0b629b5603b8a8ec071e453143e7ebe770350
parent6a3f3a4bb4dce6543dfd448ac2c4deb5d36ac9a9 (diff)
get outdated repositories from the replication queue
Adds GetOutdatedRepositories to the replication queue. This is useful for determining data loss cases and for deciding which nodes need repairing. Node is considered outdated if the latest replication job does not originate from the reference node or is not in 'completed' state. Repository is only considered up to date if has been directly replicated from the reference storage. Repository that is up to date by replicating from an up to date secondary is not considered up to date. The source node must be the previous writable primary. This commit only adds backstage changes and the code is not in use yet.
-rw-r--r--internal/praefect/datastore/memory.go36
-rw-r--r--internal/praefect/datastore/memory_test.go202
-rw-r--r--internal/praefect/datastore/queue.go43
-rw-r--r--internal/praefect/datastore/queue_test.go15
4 files changed, 296 insertions, 0 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 3a095c9c8..b49b5f86d 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "sort"
"sync"
"time"
@@ -183,6 +184,41 @@ func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Conte
return dead, nil
}
+func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) {
+ s.RLock()
+ defer s.RUnlock()
+ outdatedRepositories := make(map[string][]string)
+ for _, event := range s.lastEventByDest {
+ // ensure the event is in the virtual storage we are checking
+ if event.Job.VirtualStorage != virtualStorage ||
+ // ensure the event satisfies the rules specified in the ReplicationEventQueue
+ // interface documentation
+ event.Job.SourceNodeStorage == referenceStorage && event.State == JobStateCompleted {
+ continue
+ }
+
+ nodeAlreadyListed := false
+ for _, node := range outdatedRepositories[event.Job.RelativePath] {
+ if node == event.Job.TargetNodeStorage {
+ nodeAlreadyListed = true
+ break
+ }
+ }
+
+ if nodeAlreadyListed {
+ continue
+ }
+
+ outdatedRepositories[event.Job.RelativePath] = append(outdatedRepositories[event.Job.RelativePath], event.Job.TargetNodeStorage)
+ }
+
+ for _, slc := range outdatedRepositories {
+ sort.Strings(slc)
+ }
+
+ return outdatedRepositories, nil
+}
+
func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) {
s.RLock()
dirtyStorages := make(map[string]struct{})
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index bd9991969..5353336ff 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -143,6 +143,208 @@ func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) {
require.Equal(t, map[string]int64{"job-1": 1, "job-2": 1}, dead, "should only include the last two dead jobs")
}
+func contractTestQueueGetOutdatedRepositories(t *testing.T, rq ReplicationEventQueue, setState func(testing.TB, []ReplicationEvent)) {
+ const (
+ virtualStorage = "test-virtual-storage"
+ oldPrimary = "old-primary"
+ newPrimary = "new-primary"
+ secondary = "secondary"
+ )
+
+ now := time.Now()
+ offset := func(d time.Duration) *time.Time {
+ t := now.Add(d)
+ return &t
+ }
+
+ for _, tc := range []struct {
+ desc string
+ events []ReplicationEvent
+ error error
+ expected map[string][]string
+ }{
+ {
+ desc: "basic scenarios work",
+ events: []ReplicationEvent{
+ {
+ State: JobStateReady,
+ Job: ReplicationJob{
+ VirtualStorage: "wrong-virtual-storage",
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateDead,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: "wrong-source-node",
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "completed-job-ignored",
+ },
+ },
+ {
+ State: JobStateDead,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateInProgress,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-2",
+ },
+ },
+ {
+ State: JobStateFailed,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: secondary,
+ RelativePath: "repo-2",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": {newPrimary},
+ "repo-2": {newPrimary, secondary},
+ },
+ },
+ {
+ desc: "search considers null updated_at as latest",
+ events: []ReplicationEvent{
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateReady,
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": []string{newPrimary},
+ },
+ },
+ {
+ // completed job from a secondary indicates the new primary's
+ // state does not originate from the previous writable primary.
+ // This might indicate data loss, if the secondary is not up to
+ // date with the previous writable primary.
+ desc: "completed job from secondary",
+ events: []ReplicationEvent{
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: secondary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{
+ "repo-1": {newPrimary},
+ },
+ },
+ {
+ // Node that experienced data loss on failover but was later
+ // reconciled from the previous writable primary should
+ // contain complete data.
+ desc: "up to date with earlier failed job from old primary",
+ events: []ReplicationEvent{
+ {
+ State: JobStateDead,
+ UpdatedAt: offset(0),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ {
+ State: JobStateCompleted,
+ UpdatedAt: offset(time.Second),
+ Job: ReplicationJob{
+ VirtualStorage: virtualStorage,
+ SourceNodeStorage: oldPrimary,
+ TargetNodeStorage: newPrimary,
+ RelativePath: "repo-1",
+ },
+ },
+ },
+ expected: map[string][]string{},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ setState(t, tc.events)
+
+ actual, err := rq.GetOutdatedRepositories(ctx, virtualStorage, oldPrimary)
+ require.NoError(t, err)
+ require.Equal(t, tc.expected, actual)
+ })
+ }
+}
+
+func TestMemoryReplicationEventQueue_GetOutdatedRepositories(t *testing.T) {
+ rq := NewMemoryReplicationEventQueue(config.Config{}).(*memoryReplicationEventQueue)
+
+ contractTestQueueGetOutdatedRepositories(t, rq,
+ func(t testing.TB, events []ReplicationEvent) {
+ rq.lastEventByDest = map[eventDestination]ReplicationEvent{}
+ for _, event := range events {
+ rq.lastEventByDest[eventDestination{
+ virtual: event.Job.VirtualStorage,
+ storage: event.Job.TargetNodeStorage,
+ relativePath: event.Job.RelativePath,
+ }] = event
+ }
+ },
+ )
+}
+
func TestMemoryReplicationEventQueue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 1785cc107..0bfcff2ca 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -25,6 +25,10 @@ type ReplicationEventQueue interface {
// CountDeadReplicationJobs returns the dead replication job counts by relative path within the
// given timerange. The timerange beginning is inclusive and ending is exclusive.
CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
+ // GetOutdatedRepositories returns storages by repositories which are considered outdated. A repository is considered
+ // outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate
+ // from the reference storage.
+ GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error)
// GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
// It returns no results if there is no up to date storages or there were no replication events yet.
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
@@ -345,6 +349,45 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return acknowledged.Values(), nil
}
+func (rq PostgresReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage, reference string) (map[string][]string, error) {
+ const q = `
+WITH latest_jobs AS (
+ SELECT DISTINCT ON (repository, target)
+ job->>'relative_path' AS repository,
+ job->>'target_node_storage' AS target,
+ job->>'source_node_storage' AS source,
+ state
+ FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ ORDER BY repository, target, updated_at DESC NULLS FIRST
+)
+
+SELECT repository, target
+FROM latest_jobs
+WHERE state != 'completed' OR source != $2
+ORDER BY repository, target
+`
+
+ rows, err := rq.qc.QueryContext(ctx, q, virtualStorage, reference)
+ if err != nil {
+ return nil, err
+ }
+
+ defer rows.Close()
+
+ nodesByRepo := map[string][]string{}
+ for rows.Next() {
+ var repo, node string
+ if err := rows.Scan(&repo, &node); err != nil {
+ return nil, err
+ }
+
+ nodesByRepo[repo] = append(nodesByRepo[repo], node)
+ }
+
+ return nodesByRepo, rows.Err()
+}
+
func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) {
query := `
SELECT storage
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 75bbe608a..696b0b29c 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -572,6 +572,21 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) {
+ db := getDB(t)
+ contractTestQueueGetOutdatedRepositories(t,
+ NewPostgresReplicationEventQueue(db),
+ func(t testing.TB, events []ReplicationEvent) {
+ db.TruncateAll(t)
+ for _, event := range events {
+ db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)",
+ event.State, event.UpdatedAt, event.Job,
+ )
+ }
+ },
+ )
+}
+
func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
db := getDB(t)