diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-05-27 19:12:21 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-05-27 19:12:21 +0300 |
commit | 3ca6d344861f62e69959a49133b84989ba7dfad2 (patch) | |
tree | 93b6be8089ce14108c387dc2f3a461e53ef11562 | |
parent | a1fe1986c75312d557b73f13fe35587462c4ca5f (diff) | |
parent | c7cfeabd068e737b80a3302e02fe5cddeeb0ef3b (diff) |
Merge branch 'ps-replicator-free-of-datastore' into 'master'
Praefect: same storage name can't be used for different virtual storages
See merge request gitlab-org/gitaly!2215
-rw-r--r-- | cmd/praefect/main.go | 3 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 9 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 186 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 41 |
7 files changed, 112 insertions, 137 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index ba4f4572b..c9cac9416 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -264,7 +264,8 @@ func run(cfgs []starter.Config, conf config.Config) error { coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) repl = praefect.NewReplMgr( logger, - ds, + conf.VirtualStorageNames(), + ds.ReplicationEventQueue, nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 98d914ba9..1057b9c1d 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -154,6 +154,15 @@ func (c *Config) setDefaults() { } } +// VirtualStorageNames returns names of all virtual storages configured. +func (c *Config) VirtualStorageNames() []string { + names := make([]string, len(c.VirtualStorages)) + for i, virtual := range c.VirtualStorages { + names[i] = virtual.Name + } + return names +} + // DB holds Postgres client configuration data. type DB struct { Host string `toml:"host"` diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 9fff7b1d3..a597efd3e 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -291,6 +291,11 @@ func TestConfigParsing(t *testing.T) { } } +func TestVirtualStorageNames(t *testing.T) { + conf := Config{VirtualStorages: []*VirtualStorage{{Name: "praefect-1"}, {Name: "praefect-2"}}} + require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames()) +} + func TestToPQString(t *testing.T) { testCases := []struct { desc string diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 1b7103ff0..d49d29a9a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -63,6 +63,10 @@ func (m *mockNode) GetStorage() string { return m.storageName } func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn } +func (m *mockNode) GetAddress() string { return "" } + +func (m *mockNode) GetToken() string { return "" } + func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { for _, tc := range []struct { readOnly bool diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index d8cadf610..54d195483 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -256,6 +256,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp // TODO: run a replmgr for EVERY virtual storage replmgr := NewReplMgr( opt.withLogger, + conf.VirtualStorageNames(), opt.withDatastore, opt.withNodeMgr, WithQueueMetric(&promtest.MockGauge{}), diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index fe6a438ff..721edab29 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -23,32 +23,32 @@ import ( // Replicator performs the actual replication logic between two nodes type Replicator interface { // Replicate propagates changes from the source to the target - Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error + Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error // Destroy will remove the target repo on the specified target connection - Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // Rename will rename(move) the target repo on the specified target connection - Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // GarbageCollect will run gc on the target repository - GarbageCollect(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // RepackFull will do a full repack on the target repository - RepackFull(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error // RepackIncremental will do an incremental repack on the target repository - RepackIncremental(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error + RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Entry } -func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error { targetRepository := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } sourceRepository := &gitalypb.Repository{ - StorageName: job.SourceNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.SourceNodeStorage, + RelativePath: event.Job.RelativePath, } targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -110,10 +110,10 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob return nil } -func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -125,15 +125,15 @@ func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, return err } -func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Rename(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) - val, found := job.Params["RelativePath"] + val, found := event.Job.Params["RelativePath"] if !found { return errors.New("no 'RelativePath' parameter for rename") } @@ -151,13 +151,13 @@ func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, t return err } -func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } - val, found := job.Params["CreateBitmap"] + val, found := event.Job.Params["CreateBitmap"] if !found { return errors.New("no 'CreateBitmap' parameter for garbage collect") } @@ -176,10 +176,10 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.Re return err } -func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) @@ -191,13 +191,13 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore return err } -func (dr defaultReplicator) RepackFull(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ - StorageName: job.TargetNode.Storage, - RelativePath: job.RelativePath, + StorageName: event.Job.TargetNodeStorage, + RelativePath: event.Job.RelativePath, } - val, found := job.Params["CreateBitmap"] + val, found := event.Job.Params["CreateBitmap"] if !found { return errors.New("no 'CreateBitmap' parameter for repack full") } @@ -254,7 +254,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Entry - datastore datastore.Datastore + queue datastore.ReplicationEventQueue nodeManager nodes.Manager virtualStorages []string // replicas this replicator is responsible for replicator Replicator // does the actual replication logic @@ -292,13 +292,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log.WithField("component", "replication_manager"), - datastore: datastore, + queue: queue, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, - virtualStorages: datastore.VirtualStorages(), + virtualStorages: virtualStorages, nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), @@ -364,38 +364,12 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc { } } -// createReplJob converts `ReplicationEvent` into `ReplJob`. -// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`. -func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) { - targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage) - if err != nil { - return datastore.ReplJob{}, err - } - - sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage) - if err != nil { - return datastore.ReplJob{}, err - } - - var correlationID string - if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found { +func getCorrelationID(params datastore.Params) string { + correlationID := "" + if val, found := params[metadatahandler.CorrelationIDKey]; found { correlationID, _ = val.(string) } - - replJob := datastore.ReplJob{ - Attempts: event.Attempt, - Change: event.Job.Change, - ID: event.ID, - VirtualStorage: event.Job.VirtualStorage, - TargetNode: targetNode, - SourceNode: sourceNode, - RelativePath: event.Job.RelativePath, - Params: event.Job.Params, - CorrelationID: correlationID, - CreatedAt: event.CreatedAt, - } - - return replJob, nil + return correlationID } // ProcessBacklog starts processing of queued jobs. @@ -428,7 +402,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } for _, target := range targetNodes { - events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) if err != nil { logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") continue @@ -438,31 +412,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora eventIDsByState := map[datastore.JobState][]uint64{} for _, event := range events { - job, err := r.createReplJob(event) - if err != nil { - logger.WithField("event", event).WithError(err).Error("failed to restore replication job") - eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) - continue - } - - source, err := shard.GetNode(job.SourceNode.Storage) - if err != nil { - logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job") - eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) - continue - } - - if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil { + if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil { logger.WithFields(logrus.Fields{ - logWithReplJobID: job.ID, - logWithReplVirtual: job.VirtualStorage, - logWithReplTarget: job.TargetNode.Storage, - logWithReplSource: job.SourceNode.Storage, - logWithReplChange: job.Change, - logWithReplPath: job.RelativePath, - logWithCorrID: job.CorrelationID, + logWithReplJobID: event.ID, + logWithReplVirtual: event.Job.VirtualStorage, + logWithReplTarget: event.Job.TargetNodeStorage, + logWithReplSource: event.Job.SourceNodeStorage, + logWithReplChange: event.Job.Change, + logWithReplPath: event.Job.RelativePath, + logWithCorrID: getCorrelationID(event.Meta), }).WithError(err).Error("replication job failed") - if job.Attempts == 0 { + if event.Attempt <= 0 { eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID) } else { eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) @@ -472,7 +432,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID) } for state, eventIDs := range eventIDsByState { - ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs) + ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) if err != nil { logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") continue @@ -502,15 +462,13 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } } -func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error { - l := r.log. - WithField(logWithReplJobID, job.ID). - WithField(logWithReplVirtual, job.VirtualStorage). - WithField(logWithReplSource, job.SourceNode). - WithField(logWithReplTarget, job.TargetNode). - WithField(logWithReplPath, job.RelativePath). - WithField(logWithReplChange, job.Change). - WithField(logWithCorrID, job.CorrelationID) +func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { + source, err := shard.GetNode(event.Job.SourceNodeStorage) + if err != nil { + return fmt.Errorf("get source node: %w", err) + } + + cid := getCorrelationID(event.Meta) var replCtx context.Context var cancel func() @@ -522,48 +480,40 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour } defer cancel() - injectedCtx, err := helper.InjectGitalyServers(replCtx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token) + injectedCtx, err := helper.InjectGitalyServers(replCtx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken()) if err != nil { - l.WithError(err).Error("unable to inject Gitaly servers into context for replication job") - return err - } - - if job.CorrelationID == "" { - l.Warn("replication job missing correlation ID") + return fmt.Errorf("inject Gitaly servers into context: %w", err) } - injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, job.CorrelationID) + injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, cid) replStart := time.Now() - replDelay := replStart.Sub(job.CreatedAt) - r.replDelayMetric.WithLabelValues(job.Change.String()).Observe(replDelay.Seconds()) + r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds()) r.replQueueMetric.Inc() defer r.replQueueMetric.Dec() - switch job.Change { + switch event.Job.Change { case datastore.UpdateRepo: - err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC) + err = r.replicator.Replicate(injectedCtx, event, source.GetConnection(), targetCC) case datastore.DeleteRepo: - err = r.replicator.Destroy(injectedCtx, job, targetCC) + err = r.replicator.Destroy(injectedCtx, event, targetCC) case datastore.RenameRepo: - err = r.replicator.Rename(injectedCtx, job, targetCC) + err = r.replicator.Rename(injectedCtx, event, targetCC) case datastore.GarbageCollect: - err = r.replicator.GarbageCollect(injectedCtx, job, targetCC) + err = r.replicator.GarbageCollect(injectedCtx, event, targetCC) case datastore.RepackFull: - err = r.replicator.RepackFull(injectedCtx, job, targetCC) + err = r.replicator.RepackFull(injectedCtx, event, targetCC) case datastore.RepackIncremental: - err = r.replicator.RepackIncremental(injectedCtx, job, targetCC) + err = r.replicator.RepackIncremental(injectedCtx, event, targetCC) default: - err = fmt.Errorf("unknown replication change type encountered: %q", job.Change) + err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } if err != nil { - l.WithError(err).Error("unable to replicate") return err } - replDuration := time.Since(replStart) - r.replLatencyMetric.WithLabelValues(job.Change.String()).Observe(replDuration.Seconds()) + r.replLatencyMetric.WithLabelValues(event.Job.Change.String()).Observe(time.Since(replStart).Seconds()) return nil } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 51772ede5..46053e7d1 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -16,6 +16,7 @@ import ( gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -127,19 +128,21 @@ func TestProcessReplicationJob(t *testing.T) { secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name) require.NoError(t, err) - var jobs []datastore.ReplJob + var events []datastore.ReplicationEvent for _, secondary := range secondaries { - jobs = append(jobs, datastore.ReplJob{ - Change: datastore.UpdateRepo, - TargetNode: secondary, - SourceNode: primary, - RelativePath: testRepo.GetRelativePath(), - State: datastore.JobStateReady, - Attempts: 3, - CorrelationID: "correlation-id", + events = append(events, datastore.ReplicationEvent{ + State: datastore.JobStateReady, + Attempt: 3, + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + TargetNodeStorage: secondary.Storage, + SourceNodeStorage: primary.Storage, + RelativePath: testRepo.GetRelativePath(), + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"}, }) } - require.Len(t, jobs, 1) + require.Len(t, events, 1) commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ Message: "a commit", @@ -159,6 +162,7 @@ func TestProcessReplicationJob(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), + conf.VirtualStorageNames(), ds, nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), @@ -172,7 +176,7 @@ func TestProcessReplicationJob(t *testing.T) { require.NoError(t, err) require.Len(t, shard.Secondaries, 1) - replMgr.processReplJob(ctx, jobs[0], shard.Primary.GetConnection(), shard.Secondaries[0].GetConnection()) + replMgr.processReplJob(ctx, events[0], shard, shard.Secondaries[0].GetConnection()) relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath) require.NoError(t, err) @@ -229,7 +233,7 @@ func TestPropagateReplicationJob(t *testing.T) { coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) - replmgr := NewReplMgr(logEntry, ds, nodeMgr) + replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) prf := NewServer( coordinator.StreamDirector, @@ -521,7 +525,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, ds, nodeMgr) + replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -659,7 +663,7 @@ func TestProcessBacklog_Success(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, ds, nodeMgr) + replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -676,11 +680,11 @@ func TestProcessBacklog_Success(t *testing.T) { type mockReplicator struct { Replicator - ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error + ReplicateFunc func(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error } -func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { - return m.ReplicateFunc(ctx, job, source, target) +func (m mockReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error { + return m.ReplicateFunc(ctx, event, source, target) } func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { @@ -720,6 +724,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), + conf.VirtualStorageNames(), datastore.Datastore{datastore.NewInMemory(conf), queue}, &mockNodeManager{ GetShardFunc: func(vs string) (nodes.Shard, error) { @@ -741,7 +746,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { processed := make(chan struct{}) replMgr.replicator = mockReplicator{ - ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { + ReplicateFunc: func(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error { require.True(t, primaryConn == target) require.True(t, secondaryConn == source) close(processed) |