diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-19 11:18:39 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-19 11:18:39 +0300 |
commit | f4eb93688c7f4bac5285fc07fdde00784acb1963 (patch) | |
tree | a1b453522f1b76d48559f74e10d106f2415caaea | |
parent | 56251315578cd17c0ceebcb911e8d3ddb159afca (diff) |
remove unused GetOutdateRepositories from replication queue
-rw-r--r-- | internal/praefect/datastore/memory.go | 38 | ||||
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 219 | ||||
-rw-r--r-- | internal/praefect/datastore/mock.go | 7 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 44 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 15 |
5 files changed, 1 insertions, 322 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index d2c981e5c..e615956ca 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "sort" "sync" "time" @@ -196,43 +195,6 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt return result, nil } -func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) { - s.RLock() - defer s.RUnlock() - outdatedRepositories := make(map[string][]string) - for _, event := range s.lastEventByDest { - // ensure the event is in the virtual storage we are checking and it is not targeting - // the reference node - if event.Job.VirtualStorage != virtualStorage || - event.Job.TargetNodeStorage == referenceStorage || - // ensure the event satisfies the rules specified in the ReplicationEventQueue - // interface documentation - event.Job.SourceNodeStorage == referenceStorage && event.State == JobStateCompleted { - continue - } - - nodeAlreadyListed := false - for _, node := range outdatedRepositories[event.Job.RelativePath] { - if node == event.Job.TargetNodeStorage { - nodeAlreadyListed = true - break - } - } - - if nodeAlreadyListed { - continue - } - - outdatedRepositories[event.Job.RelativePath] = append(outdatedRepositories[event.Job.RelativePath], event.Job.TargetNodeStorage) - } - - for _, slc := range outdatedRepositories { - sort.Strings(slc) - } - - return outdatedRepositories, nil -} - func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) { s.RLock() dirtyStorages := make(map[string]struct{}) diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index 98594b428..e707fed08 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -3,231 +3,12 @@ package datastore import ( "sync" "testing" - "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) -func contractTestQueueGetOutdatedRepositories(t *testing.T, rq ReplicationEventQueue, setState func(testing.TB, []ReplicationEvent)) { - const ( - virtualStorage = "test-virtual-storage" - oldPrimary = "old-primary" - newPrimary = "new-primary" - secondary = "secondary" - ) - - now := time.Now() - offset := func(d time.Duration) *time.Time { - t := now.Add(d) - return &t - } - - for _, tc := range []struct { - desc string - events []ReplicationEvent - error error - expected map[string][]string - }{ - { - desc: "basic scenarios work", - events: []ReplicationEvent{ - { - State: JobStateReady, - Job: ReplicationJob{ - VirtualStorage: "wrong-virtual-storage", - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateDead, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: "wrong-source-node", - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateCompleted, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "completed-job-ignored", - }, - }, - { - State: JobStateDead, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateInProgress, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-2", - }, - }, - { - State: JobStateFailed, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: secondary, - RelativePath: "repo-2", - }, - }, - }, - expected: map[string][]string{ - "repo-1": {newPrimary}, - "repo-2": {newPrimary, secondary}, - }, - }, - { - desc: "search considers null updated_at as latest", - events: []ReplicationEvent{ - { - State: JobStateCompleted, - UpdatedAt: offset(0), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateReady, - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - }, - expected: map[string][]string{ - "repo-1": []string{newPrimary}, - }, - }, - { - desc: "jobs targeting reference are ignored", - events: []ReplicationEvent{ - { - State: JobStateDead, - UpdatedAt: offset(0), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: secondary, - TargetNodeStorage: oldPrimary, - RelativePath: "repo-1", - }, - }, - }, - expected: map[string][]string{}, - }, - { - // completed job from a secondary indicates the new primary's - // state does not originate from the previous writable primary. - // This might indicate data loss, if the secondary is not up to - // date with the previous writable primary. - desc: "completed job from secondary", - events: []ReplicationEvent{ - { - State: JobStateCompleted, - UpdatedAt: offset(0), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateCompleted, - UpdatedAt: offset(time.Second), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: secondary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - }, - expected: map[string][]string{ - "repo-1": {newPrimary}, - }, - }, - { - // Node that experienced data loss on failover but was later - // reconciled from the previous writable primary should - // contain complete data. - desc: "up to date with earlier failed job from old primary", - events: []ReplicationEvent{ - { - State: JobStateDead, - UpdatedAt: offset(0), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - { - State: JobStateCompleted, - UpdatedAt: offset(time.Second), - Job: ReplicationJob{ - VirtualStorage: virtualStorage, - SourceNodeStorage: oldPrimary, - TargetNodeStorage: newPrimary, - RelativePath: "repo-1", - }, - }, - }, - expected: map[string][]string{}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - setState(t, tc.events) - - actual, err := rq.GetOutdatedRepositories(ctx, virtualStorage, oldPrimary) - require.NoError(t, err) - require.Equal(t, tc.expected, actual) - }) - } -} - -func TestMemoryReplicationEventQueue_GetOutdatedRepositories(t *testing.T) { - rq := NewMemoryReplicationEventQueue(config.Config{}).(*memoryReplicationEventQueue) - - contractTestQueueGetOutdatedRepositories(t, rq, - func(t testing.TB, events []ReplicationEvent) { - rq.lastEventByDest = map[eventDestination]ReplicationEvent{} - for _, event := range events { - rq.lastEventByDest[eventDestination{ - virtual: event.Job.VirtualStorage, - storage: event.Job.TargetNodeStorage, - relativePath: event.Job.RelativePath, - }] = event - } - }, - ) -} - func TestMemoryReplicationEventQueue(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() diff --git a/internal/praefect/datastore/mock.go b/internal/praefect/datastore/mock.go index c3e660e8e..74886dc83 100644 --- a/internal/praefect/datastore/mock.go +++ b/internal/praefect/datastore/mock.go @@ -6,12 +6,7 @@ import "context" // and allows for parametrizing behavior. type MockReplicationEventQueue struct { ReplicationEventQueue - GetOutdatedRepositoriesFunc func(context.Context, string, string) (map[string][]string, error) - EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error) -} - -func (m *MockReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) { - return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage, referenceStorage) + EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error) } func (m *MockReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index a3d68d5ce..4a92296c5 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -26,10 +26,6 @@ type ReplicationEventQueue interface { // 'completed'. Otherwise it won't be changed. // It returns sub-set of passed in ids that were updated. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) - // GetOutdatedRepositories returns storages by repositories which are considered outdated. A repository is considered - // outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate - // from the reference storage. - GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) // StartHealthUpdate starts periodical update of the event's health identifier. // The events with fresh health identifier won't be considered as stale. // The health update will be executed on each new entry received from trigger channel passed in. @@ -334,46 +330,6 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J return acknowledged.Values(), nil } -func (rq PostgresReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage, reference string) (map[string][]string, error) { - const q = ` -WITH latest_jobs AS ( - SELECT DISTINCT ON (repository, target) - job->>'relative_path' AS repository, - job->>'target_node_storage' AS target, - job->>'source_node_storage' AS source, - state - FROM replication_queue - WHERE job->>'virtual_storage' = $1 AND - job->>'target_node_storage' != $2 - ORDER BY repository, target, updated_at DESC NULLS FIRST -) - -SELECT repository, target -FROM latest_jobs -WHERE state != 'completed' OR source != $2 -ORDER BY repository, target -` - - rows, err := rq.qc.QueryContext(ctx, q, virtualStorage, reference) - if err != nil { - return nil, err - } - - defer rows.Close() - - nodesByRepo := map[string][]string{} - for rows.Next() { - var repo, node string - if err := rows.Scan(&repo, &node); err != nil { - return nil, err - } - - nodesByRepo[repo] = append(nodesByRepo[repo], node) - } - - return nodesByRepo, rows.Err() -} - func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) { query := ` SELECT storage diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 4cfcf2200..b3600d63e 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -626,21 +626,6 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { requireJobLocks(t, ctx, db, nil) } -func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) { - db := getDB(t) - contractTestQueueGetOutdatedRepositories(t, - NewPostgresReplicationEventQueue(db), - func(t testing.TB, events []ReplicationEvent) { - db.TruncateAll(t) - for _, event := range events { - db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)", - event.State, event.UpdatedAt, event.Job, - ) - } - }, - ) -} - func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) { db := getDB(t) |