Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-07-24 14:49:21 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-07-31 09:58:08 +0300
commit85e4f4286a2df414ae00006f8455523d048eb07e (patch)
treed2b7c1d790761c0f282ee62239fc2ae7cd035340
parentaa7e6e1150c57de4cda18767adda50d2173dc7c5 (diff)
Improve query to identify up to date storages for reads distribution
Retrieve up to date storages that can server read operation for the repository in order to distribute reads across all healthy storages of the virtual storage. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2944
-rw-r--r--changelogs/unreleased/ps-improve-reads-dist.yml5
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator_test.go18
-rw-r--r--internal/praefect/datastore/queue.go3
-rw-r--r--internal/praefect/helper_test.go14
-rw-r--r--internal/praefect/nodes/manager.go33
-rw-r--r--internal/praefect/nodes/manager_test.go235
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_factory_test.go2
-rw-r--r--internal/praefect/server_test.go2
11 files changed, 122 insertions, 202 deletions
diff --git a/changelogs/unreleased/ps-improve-reads-dist.yml b/changelogs/unreleased/ps-improve-reads-dist.yml
new file mode 100644
index 000000000..6ab1dbc26
--- /dev/null
+++ b/changelogs/unreleased/ps-improve-reads-dist.yml
@@ -0,0 +1,5 @@
+---
+title: Improve query to identify up to date storages for reads distribution
+merge_request: 2372
+author:
+type: changed
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 22c3b6432..8bba748c8 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -236,7 +236,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
rs = datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
}
- nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
+ nodeManager, err := nodes.NewManager(logger, conf, db, rs, nodeLatencyHistogram)
if err != nil {
return err
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 92ad0dbbe..f93589ad9 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
logEntry := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 373e69d4c..588a529a8 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -149,7 +149,7 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -339,7 +339,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, feature)
}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
@@ -482,7 +482,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -566,7 +566,13 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ repoStore := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ require.NoError(t, repoStore.IncrementGeneration(ctx, conf.VirtualStorages[0].Name, targetRepo.RelativePath, primaryNodeConf.Storage, nil))
+ generation, err := repoStore.GetGeneration(ctx, conf.VirtualStorages[0].Name, targetRepo.RelativePath, primaryNodeConf.Storage)
+ require.NoError(t, err)
+ require.NoError(t, repoStore.SetGeneration(ctx, conf.VirtualStorages[0].Name, targetRepo.RelativePath, secondaryNodeConf.Storage, generation))
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -574,7 +580,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
coordinator := NewCoordinator(
queue,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
+ repoStore,
nodeMgr,
txMgr,
conf,
@@ -765,7 +771,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 0d7083de4..a3d68d5ce 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -30,9 +30,6 @@ type ReplicationEventQueue interface {
// outdated if the latest replication job is not in 'complete' state or the latest replication job does not originate
// from the reference storage.
GetOutdatedRepositories(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error)
- // GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
- // It returns no results if there is no up to date storages or there were no replication events yet.
- GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
// StartHealthUpdate starts periodical update of the event's health identifier.
// The events with fresh health identifier won't be considered as stale.
// The health update will be executed on each new entry received from trigger channel passed in.
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index d51379f13..663550e65 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -107,6 +107,7 @@ type buildOptions struct {
withAnnotations *protoregistry.Registry
withLogger *logrus.Entry
withNodeMgr nodes.Manager
+ withRepoStore datastore.RepositoryStore
}
func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup {
@@ -187,19 +188,26 @@ func defaultTxMgr() *transactions.Manager {
return transactions.NewManager()
}
-func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec())
+func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
}
+func defaultRepoStore(conf config.Config) datastore.RepositoryStore {
+ return datastore.NewMemoryRepositoryStore(conf.StorageNames())
+}
+
func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
var cleanups []testhelper.Cleanup
if opt.withQueue == nil {
opt.withQueue = defaultQueue(conf)
}
+ if opt.withRepoStore == nil {
+ opt.withRepoStore = defaultRepoStore(conf)
+ }
if opt.withTxMgr == nil {
opt.withTxMgr = defaultTxMgr()
}
@@ -213,7 +221,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withLogger = log.Default()
}
if opt.withNodeMgr == nil {
- opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue)
+ opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withRepoStore)
}
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 0720d260d..bb6468b71 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -99,7 +99,7 @@ type Mgr struct {
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
db *sql.DB
- queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
}
// leaderElectionStrategy defines the interface by which primary and
@@ -117,7 +117,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, rs datastore.RepositoryStore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -164,7 +164,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
db: db,
failoverEnabled: c.Failover.Enabled,
strategies: strategies,
- queue: queue,
+ rs: rs,
}, nil
}
@@ -216,18 +216,21 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
}
logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath})
- upToDateStorages, err := n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath)
+ upToDateStorages, err := n.rs.GetConsistentSecondaries(ctx, virtualStorageName, repoPath, shard.Primary.GetStorage())
if err != nil {
// this is recoverable error - proceed with primary node
logger.WithError(err).Warn("get up to date secondaries")
}
- // Make sure that nodes are unique in case the up-to-date storages also
- // contain the primary.
- storages := make(map[Node]struct{}, len(upToDateStorages)+1) // +1 is for the primary node
- storages[shard.Primary] = struct{}{}
+ if len(upToDateStorages) == 0 {
+ upToDateStorages = make(map[string]struct{}, 1)
+ }
+
+ // Primary should be considered as all other storages for serving read operations
+ upToDateStorages[shard.Primary.GetStorage()] = struct{}{}
+ healthyStorages := make([]Node, 0, len(upToDateStorages))
- for _, upToDateStorage := range upToDateStorages {
+ for upToDateStorage := range upToDateStorages {
node, err := shard.GetNode(upToDateStorage)
if err != nil {
// this is recoverable error - proceed with with other nodes
@@ -238,18 +241,14 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
continue
}
- storages[node] = struct{}{}
+ healthyStorages = append(healthyStorages, node)
}
- i := rand.Intn(len(storages))
- for storage := range storages {
- if i == 0 {
- return storage, nil
- }
- i--
+ if len(healthyStorages) == 0 {
+ return nil, ErrPrimaryNotHealthy
}
- return nil, errors.New("could not select random storage")
+ return healthyStorages[rand.Intn(len(healthyStorages))], nil
}
func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec) *nodeStatus {
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index d5b535cf0..40e9b15d3 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -3,6 +3,7 @@ package nodes
import (
"context"
"errors"
+ "fmt"
"net"
"testing"
"time"
@@ -299,217 +300,121 @@ func TestNodeManager(t *testing.T) {
}
func TestMgr_GetSyncedNode(t *testing.T) {
- var sockets [4]string
- var srvs [4]*grpc.Server
- var healthSrvs [4]*health.Server
- for i := 0; i < 4; i++ {
- sockets[i] = testhelper.GetTemporaryGitalySocketFileName()
- srvs[i], healthSrvs[i] = testhelper.NewServerWithHealth(t, sockets[i])
+ const count = 3
+ const virtualStorage = "virtual-storage-0"
+ const repoPath = "path/1"
+
+ var srvs [count]*grpc.Server
+ var healthSrvs [count]*health.Server
+ var nodes [count]*config.Node
+ for i := 0; i < count; i++ {
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ srvs[i], healthSrvs[i] = testhelper.NewServerWithHealth(t, socket)
defer srvs[i].Stop()
- }
-
- vs0Primary := "unix://" + sockets[0]
- vs1Primary := "unix://" + sockets[2]
- vs1Secondary := "unix://" + sockets[3]
-
- virtualStorages := []*config.VirtualStorage{
- {
- Name: "virtual-storage-0",
- Nodes: []*config.Node{
- {
- Storage: "gitaly-0",
- Address: vs0Primary,
- },
- {
- Storage: "gitaly-1",
- Address: "unix://" + sockets[1],
- },
- },
- },
- {
- // second virtual storage needed to check there is no intersections between two even with same storage names
- Name: "virtual-storage-1",
- Nodes: []*config.Node{
- {
- // same storage name as in other virtual storage is used intentionally
- Storage: "gitaly-1",
- Address: "unix://" + sockets[2],
- },
- {
- Storage: "gitaly-2",
- Address: vs1Secondary,
- },
- },
- },
+ nodes[i] = &config.Node{Storage: fmt.Sprintf("gitaly-%d", i), Address: "unix://" + socket}
}
conf := config.Config{
- VirtualStorages: virtualStorages,
+ VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: nodes[:]}},
Failover: config.Failover{Enabled: true},
}
- mockHistogram := promtest.NewMockHistogramVec()
-
ctx, cancel := testhelper.Context()
defer cancel()
ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads)
- ackEvent := func(queue datastore.ReplicationEventQueue, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
- event := datastore.ReplicationEvent{Job: job}
-
- eevent, err := queue.Enqueue(ctx, event)
- require.NoError(t, err)
-
- devents, err := queue.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
- require.NoError(t, err)
- require.Len(t, devents, 1)
-
- acks, err := queue.Acknowledge(ctx, state, []uint64{devents[0].ID})
- require.NoError(t, err)
- require.Equal(t, []uint64{devents[0].ID}, acks)
- return devents[0]
- }
-
- verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) {
- queue := datastore.NewMemoryReplicationEventQueue(conf)
+ verify := func(scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) {
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec())
require.NoError(t, err)
nm.Start(time.Duration(0), time.Hour)
- return func(t *testing.T) { scenario(t, nm, queue) }
+ return func(t *testing.T) { scenario(t, nm, rs) }
}
- t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "")
+ t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
+ _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "stub")
require.True(t, errors.Is(err, ErrVirtualStorageNotExist))
}))
- t.Run("no replication events", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter")
+ t.Run("state is undefined", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
+ node, err := nm.GetSyncedNode(ctx, virtualStorage, "no/matter")
require.NoError(t, err)
- require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress())
+ require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "")
}))
- t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
- Job: datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- },
- })
+ t.Run("multiple storages up to date", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
+ require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil))
+ generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0")
require.NoError(t, err)
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, repoPath, "gitaly-1", generation))
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, repoPath, "gitaly-2", generation))
- node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1")
- require.NoError(t, err)
- require.Equal(t, vs0Primary, node.GetAddress())
+ chosen := map[Node]struct{}{}
+ for i := 0; i < 1000 && len(chosen) < 2; i++ {
+ node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
+ require.NoError(t, err)
+ chosen[node] = struct{}{}
+ }
+ if len(chosen) < 2 {
+ require.FailNow(t, "no distribution in too many attempts")
+ }
}))
- t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- vs0Event, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
- Job: datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- },
- })
+ t.Run("single secondary storage up to date but unhealthy", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
+ require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil))
+ generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0")
require.NoError(t, err)
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, repoPath, "gitaly-1", generation))
+
+ healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
- vs0Events, err := queue.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
+ shard, err := nm.GetShard(virtualStorage)
require.NoError(t, err)
- require.Len(t, vs0Events, 1)
- node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1")
+ gitaly1, err := shard.GetNode("gitaly-1")
require.NoError(t, err)
- require.Equal(t, vs0Primary, node.GetAddress())
- }))
- t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- vs0Event := ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- }, datastore.JobStateFailed)
+ ok, err := gitaly1.CheckHealth(ctx)
+ require.NoError(t, err)
+ require.False(t, ok)
- node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath)
+ node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
require.NoError(t, err)
- require.Equal(t, vs0Primary, node.GetAddress())
+ require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "secondary shouldn't be chosen as it is unhealthy")
}))
- t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- vsEvent0 := ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- }, datastore.JobStateCompleted)
-
- vsEvent1, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
- Job: datastore.ReplicationJob{
- RelativePath: vsEvent0.Job.RelativePath,
- TargetNodeStorage: vsEvent0.Job.TargetNodeStorage,
- SourceNodeStorage: vsEvent0.Job.SourceNodeStorage,
- VirtualStorage: vsEvent0.Job.VirtualStorage,
- },
- })
+ t.Run("no healthy storages", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
+ require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil))
+ generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0")
require.NoError(t, err)
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, repoPath, "gitaly-1", generation))
- node, err := nm.GetSyncedNode(ctx, vsEvent1.Job.VirtualStorage, vsEvent1.Job.RelativePath)
+ healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+ healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+
+ shard, err := nm.GetShard(virtualStorage)
require.NoError(t, err)
- require.Equal(t, vs0Primary, node.GetAddress())
- }))
- t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- vs0Event := ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- }, datastore.JobStateDead)
-
- ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-2",
- SourceNodeStorage: "gitaly-1",
- VirtualStorage: "virtual-storage-1",
- }, datastore.JobStateCompleted)
-
- node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath)
+ gitaly0, err := shard.GetNode("gitaly-0")
require.NoError(t, err)
- require.Equal(t, vs0Primary, node.GetAddress())
- }))
- t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
- ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "virtual-storage-0",
- }, datastore.JobStateCompleted)
-
- ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/1",
- TargetNodeStorage: "gitaly-2",
- SourceNodeStorage: "gitaly-1",
- VirtualStorage: "virtual-storage-1",
- }, datastore.JobStateCompleted)
-
- vs1Event := ackEvent(queue, datastore.ReplicationJob{
- RelativePath: "path/2",
- TargetNodeStorage: "gitaly-2",
- SourceNodeStorage: "gitaly-1",
- VirtualStorage: "virtual-storage-1",
- }, datastore.JobStateCompleted)
-
- node, err := nm.GetSyncedNode(ctx, vs1Event.Job.VirtualStorage, vs1Event.Job.RelativePath)
+ gitaly0OK, err := gitaly0.CheckHealth(ctx)
require.NoError(t, err)
- require.Contains(t, []string{vs1Primary, vs1Secondary}, node.GetAddress(), "should be one of the secondaries or the primary")
+ require.False(t, gitaly0OK)
+
+ gitaly1, err := shard.GetNode("gitaly-1")
+ require.NoError(t, err)
+
+ gitaly1OK, err := gitaly1.CheckHealth(ctx)
+ require.NoError(t, err)
+ require.False(t, gitaly1OK)
+
+ _, err = nm.GetSyncedNode(ctx, virtualStorage, repoPath)
+ require.True(t, errors.Is(err, ErrPrimaryNotHealthy))
}))
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index af11ca524..dad31494e 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -123,7 +123,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -220,7 +220,7 @@ func TestPropagateReplicationJob(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -506,7 +506,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(
@@ -651,7 +651,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index f09e61832..7d4939db4 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -77,7 +77,7 @@ func TestServerFactory(t *testing.T) {
logger := testhelper.DiscardTestEntry(t)
queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logger, conf, nil, queue, &promtest.MockHistogramVec{})
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, &promtest.MockHistogramVec{})
require.NoError(t, err)
txMgr := transactions.NewManager()
registry := protoregistry.GitalyProtoPreregistered
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 3a5e6ec48..246b9be50 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -786,7 +786,7 @@ func TestProxyWrites(t *testing.T) {
queue := datastore.NewMemoryReplicationEventQueue(conf)
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
ctx, cancel := testhelper.Context()