Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-08-19 11:18:39 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-08-19 11:18:39 +0300
commitf4eb93688c7f4bac5285fc07fdde00784acb1963 (patch)
treea1b453522f1b76d48559f74e10d106f2415caaea
parent56251315578cd17c0ceebcb911e8d3ddb159afca (diff)
remove unused GetOutdateRepositories from replication queue
-rw-r--r--internal/praefect/datastore/memory.go38
-rw-r--r--internal/praefect/datastore/memory_test.go219
-rw-r--r--internal/praefect/datastore/mock.go7
-rw-r--r--internal/praefect/datastore/queue.go44
-rw-r--r--internal/praefect/datastore/queue_test.go15
5 files changed, 1 insertions, 322 deletions
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index d2c981e5c..e615956ca 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "sort"
"sync"
"time"
@@ -196,43 +195,6 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
return result, nil
}
-func (s *memoryReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) {
- s.RLock()
- defer s.RUnlock()
- outdatedRepositories := make(map[string][]string)
- for _, event := range s.lastEventByDest {
- // ensure the event is in the virtual storage we are checking and it is not targeting
- // the reference node
- if event.Job.VirtualStorage != virtualStorage ||
- event.Job.TargetNodeStorage == referenceStorage ||
- // ensure the event satisfies the rules specified in the ReplicationEventQueue
- // interface documentation
- event.Job.SourceNodeStorage == referenceStorage && event.State == JobStateCompleted {
- continue
- }
-
- nodeAlreadyListed := false
- for _, node := range outdatedRepositories[event.Job.RelativePath] {
- if node == event.Job.TargetNodeStorage {
- nodeAlreadyListed = true
- break
- }
- }
-
- if nodeAlreadyListed {
- continue
- }
-
- outdatedRepositories[event.Job.RelativePath] = append(outdatedRepositories[event.Job.RelativePath], event.Job.TargetNodeStorage)
- }
-
- for _, slc := range outdatedRepositories {
- sort.Strings(slc)
- }
-
- return outdatedRepositories, nil
-}
-
func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) {
s.RLock()
dirtyStorages := make(map[string]struct{})
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index 98594b428..e707fed08 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -3,231 +3,12 @@ package datastore
import (
"sync"
"testing"
- "time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
-func contractTestQueueGetOutdatedRepositories(t *testing.T, rq ReplicationEventQueue, setState func(testing.TB, []ReplicationEvent)) {
- const (
- virtualStorage = "test-virtual-storage"
- oldPrimary = "old-primary"
- newPrimary = "new-primary"
- secondary = "secondary"
- )
-
- now := time.Now()
- offset := func(d time.Duration) *time.Time {
- t := now.Add(d)
- return &t
- }
-
- for _, tc := range []struct {
- desc string
- events []ReplicationEvent
- error error
- expected map[string][]string
- }{
- {
- desc: "basic scenarios work",
- events: []ReplicationEvent{
- {
- State: JobStateReady,
- Job: ReplicationJob{
- VirtualStorage: "wrong-virtual-storage",
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateDead,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: "wrong-source-node",
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateCompleted,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "completed-job-ignored",
- },
- },
- {
- State: JobStateDead,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateInProgress,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-2",
- },
- },
- {
- State: JobStateFailed,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: secondary,
- RelativePath: "repo-2",
- },
- },
- },
- expected: map[string][]string{
- "repo-1": {newPrimary},
- "repo-2": {newPrimary, secondary},
- },
- },
- {
- desc: "search considers null updated_at as latest",
- events: []ReplicationEvent{
- {
- State: JobStateCompleted,
- UpdatedAt: offset(0),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateReady,
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- },
- expected: map[string][]string{
- "repo-1": []string{newPrimary},
- },
- },
- {
- desc: "jobs targeting reference are ignored",
- events: []ReplicationEvent{
- {
- State: JobStateDead,
- UpdatedAt: offset(0),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: secondary,
- TargetNodeStorage: oldPrimary,
- RelativePath: "repo-1",
- },
- },
- },
- expected: map[string][]string{},
- },
- {
- // completed job from a secondary indicates the new primary's
- // state does not originate from the previous writable primary.
- // This might indicate data loss, if the secondary is not up to
- // date with the previous writable primary.
- desc: "completed job from secondary",
- events: []ReplicationEvent{
- {
- State: JobStateCompleted,
- UpdatedAt: offset(0),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateCompleted,
- UpdatedAt: offset(time.Second),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: secondary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- },
- expected: map[string][]string{
- "repo-1": {newPrimary},
- },
- },
- {
- // Node that experienced data loss on failover but was later
- // reconciled from the previous writable primary should
- // contain complete data.
- desc: "up to date with earlier failed job from old primary",
- events: []ReplicationEvent{
- {
- State: JobStateDead,
- UpdatedAt: offset(0),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- {
- State: JobStateCompleted,
- UpdatedAt: offset(time.Second),
- Job: ReplicationJob{
- VirtualStorage: virtualStorage,
- SourceNodeStorage: oldPrimary,
- TargetNodeStorage: newPrimary,
- RelativePath: "repo-1",
- },
- },
- },
- expected: map[string][]string{},
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- setState(t, tc.events)
-
- actual, err := rq.GetOutdatedRepositories(ctx, virtualStorage, oldPrimary)
- require.NoError(t, err)
- require.Equal(t, tc.expected, actual)
- })
- }
-}
-
-func TestMemoryReplicationEventQueue_GetOutdatedRepositories(t *testing.T) {
- rq := NewMemoryReplicationEventQueue(config.Config{}).(*memoryReplicationEventQueue)
-
- contractTestQueueGetOutdatedRepositories(t, rq,
- func(t testing.TB, events []ReplicationEvent) {
- rq.lastEventByDest = map[eventDestination]ReplicationEvent{}
- for _, event := range events {
- rq.lastEventByDest[eventDestination{
- virtual: event.Job.VirtualStorage,
- storage: event.Job.TargetNodeStorage,
- relativePath: event.Job.RelativePath,
- }] = event
- }
- },
- )
-}
-
func TestMemoryReplicationEventQueue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
diff --git a/internal/praefect/datastore/mock.go b/internal/praefect/datastore/mock.go
index c3e660e8e..74886dc83 100644
--- a/internal/praefect/datastore/mock.go
+++ b/internal/praefect/datastore/mock.go
@@ -6,12 +6,7 @@ import "context"
// and allows for parametrizing behavior.
type MockReplicationEventQueue struct {
ReplicationEventQueue
- GetOutdatedRepositoriesFunc func(context.Context, string, string) (map[string][]string, error)
- EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error)
-}
-
-func (m *MockReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) {
- return m.GetOutdatedRepositoriesFunc(ctx, virtualStorage, referenceStorage)
+ EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error)
}
func (m *MockReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index a3d68d5ce..4a92296c5 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -26,10 +26,6 @@ type ReplicationEventQueue interface {
// 'completed'. Otherwise it won't be changed.
// It returns sub-set of passed in ids that were updated.
Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
- // GetOutdatedRepositories returns storages by repositories which are considered outdated. A repository is considered
- // outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate
- // from the reference storage.
- GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error)
// StartHealthUpdate starts periodical update of the event's health identifier.
// The events with fresh health identifier won't be considered as stale.
// The health update will be executed on each new entry received from trigger channel passed in.
@@ -334,46 +330,6 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return acknowledged.Values(), nil
}
-func (rq PostgresReplicationEventQueue) GetOutdatedRepositories(ctx context.Context, virtualStorage, reference string) (map[string][]string, error) {
- const q = `
-WITH latest_jobs AS (
- SELECT DISTINCT ON (repository, target)
- job->>'relative_path' AS repository,
- job->>'target_node_storage' AS target,
- job->>'source_node_storage' AS source,
- state
- FROM replication_queue
- WHERE job->>'virtual_storage' = $1 AND
- job->>'target_node_storage' != $2
- ORDER BY repository, target, updated_at DESC NULLS FIRST
-)
-
-SELECT repository, target
-FROM latest_jobs
-WHERE state != 'completed' OR source != $2
-ORDER BY repository, target
-`
-
- rows, err := rq.qc.QueryContext(ctx, q, virtualStorage, reference)
- if err != nil {
- return nil, err
- }
-
- defer rows.Close()
-
- nodesByRepo := map[string][]string{}
- for rows.Next() {
- var repo, node string
- if err := rows.Scan(&repo, &node); err != nil {
- return nil, err
- }
-
- nodesByRepo[repo] = append(nodesByRepo[repo], node)
- }
-
- return nodesByRepo, rows.Err()
-}
-
func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) {
query := `
SELECT storage
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 4cfcf2200..b3600d63e 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -626,21 +626,6 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
requireJobLocks(t, ctx, db, nil)
}
-func TestPostgresReplicationEventQueue_GetOutdatedRepositories(t *testing.T) {
- db := getDB(t)
- contractTestQueueGetOutdatedRepositories(t,
- NewPostgresReplicationEventQueue(db),
- func(t testing.TB, events []ReplicationEvent) {
- db.TruncateAll(t)
- for _, event := range events {
- db.MustExec(t, "INSERT INTO replication_queue (state, updated_at, job) VALUES ($1, $2, $3)",
- event.State, event.UpdatedAt, event.Job,
- )
- }
- },
- )
-}
-
func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
db := getDB(t)