Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-05-27 19:12:21 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-05-27 19:12:21 +0300
commit3ca6d344861f62e69959a49133b84989ba7dfad2 (patch)
tree93b6be8089ce14108c387dc2f3a461e53ef11562
parenta1fe1986c75312d557b73f13fe35587462c4ca5f (diff)
parentc7cfeabd068e737b80a3302e02fe5cddeeb0ef3b (diff)
Merge branch 'ps-replicator-free-of-datastore' into 'master'
Praefect: same storage name can't be used for different virtual storages See merge request gitlab-org/gitaly!2215
-rw-r--r--cmd/praefect/main.go3
-rw-r--r--internal/praefect/config/config.go9
-rw-r--r--internal/praefect/config/config_test.go5
-rw-r--r--internal/praefect/coordinator_test.go4
-rw-r--r--internal/praefect/helper_test.go1
-rw-r--r--internal/praefect/replicator.go186
-rw-r--r--internal/praefect/replicator_test.go41
7 files changed, 112 insertions, 137 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index ba4f4572b..c9cac9416 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -264,7 +264,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
repl = praefect.NewReplMgr(
logger,
- ds,
+ conf.VirtualStorageNames(),
+ ds.ReplicationEventQueue,
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 98d914ba9..1057b9c1d 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -154,6 +154,15 @@ func (c *Config) setDefaults() {
}
}
+// VirtualStorageNames returns names of all virtual storages configured.
+func (c *Config) VirtualStorageNames() []string {
+ names := make([]string, len(c.VirtualStorages))
+ for i, virtual := range c.VirtualStorages {
+ names[i] = virtual.Name
+ }
+ return names
+}
+
// DB holds Postgres client configuration data.
type DB struct {
Host string `toml:"host"`
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 9fff7b1d3..a597efd3e 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -291,6 +291,11 @@ func TestConfigParsing(t *testing.T) {
}
}
+func TestVirtualStorageNames(t *testing.T) {
+ conf := Config{VirtualStorages: []*VirtualStorage{{Name: "praefect-1"}, {Name: "praefect-2"}}}
+ require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames())
+}
+
func TestToPQString(t *testing.T) {
testCases := []struct {
desc string
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 1b7103ff0..d49d29a9a 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -63,6 +63,10 @@ func (m *mockNode) GetStorage() string { return m.storageName }
func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn }
+func (m *mockNode) GetAddress() string { return "" }
+
+func (m *mockNode) GetToken() string { return "" }
+
func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
for _, tc := range []struct {
readOnly bool
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index d8cadf610..54d195483 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -256,6 +256,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
// TODO: run a replmgr for EVERY virtual storage
replmgr := NewReplMgr(
opt.withLogger,
+ conf.VirtualStorageNames(),
opt.withDatastore,
opt.withNodeMgr,
WithQueueMetric(&promtest.MockGauge{}),
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index fe6a438ff..721edab29 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -23,32 +23,32 @@ import (
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
// Replicate propagates changes from the source to the target
- Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
// Destroy will remove the target repo on the specified target connection
- Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// Rename will rename(move) the target repo on the specified target connection
- Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// GarbageCollect will run gc on the target repository
- GarbageCollect(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// RepackFull will do a full repack on the target repository
- RepackFull(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
// RepackIncremental will do an incremental repack on the target repository
- RepackIncremental(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Entry
}
-func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
sourceRepository := &gitalypb.Repository{
- StorageName: job.SourceNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.SourceNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -110,10 +110,10 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
return nil
}
-func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -125,15 +125,15 @@ func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob,
return err
}
-func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Rename(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
- val, found := job.Params["RelativePath"]
+ val, found := event.Job.Params["RelativePath"]
if !found {
return errors.New("no 'RelativePath' parameter for rename")
}
@@ -151,13 +151,13 @@ func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, t
return err
}
-func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
- val, found := job.Params["CreateBitmap"]
+ val, found := event.Job.Params["CreateBitmap"]
if !found {
return errors.New("no 'CreateBitmap' parameter for garbage collect")
}
@@ -176,10 +176,10 @@ func (dr defaultReplicator) GarbageCollect(ctx context.Context, job datastore.Re
return err
}
-func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -191,13 +191,13 @@ func (dr defaultReplicator) RepackIncremental(ctx context.Context, job datastore
return err
}
-func (dr defaultReplicator) RepackFull(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) RepackFull(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
- StorageName: job.TargetNode.Storage,
- RelativePath: job.RelativePath,
+ StorageName: event.Job.TargetNodeStorage,
+ RelativePath: event.Job.RelativePath,
}
- val, found := job.Params["CreateBitmap"]
+ val, found := event.Job.Params["CreateBitmap"]
if !found {
return errors.New("no 'CreateBitmap' parameter for repack full")
}
@@ -254,7 +254,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
- datastore datastore.Datastore
+ queue datastore.ReplicationEventQueue
nodeManager nodes.Manager
virtualStorages []string // replicas this replicator is responsible for
replicator Replicator // does the actual replication logic
@@ -292,13 +292,13 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
- datastore: datastore,
+ queue: queue,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
- virtualStorages: datastore.VirtualStorages(),
+ virtualStorages: virtualStorages,
nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
@@ -364,38 +364,12 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
}
}
-// createReplJob converts `ReplicationEvent` into `ReplJob`.
-// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`.
-func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) {
- targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage)
- if err != nil {
- return datastore.ReplJob{}, err
- }
-
- sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage)
- if err != nil {
- return datastore.ReplJob{}, err
- }
-
- var correlationID string
- if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found {
+func getCorrelationID(params datastore.Params) string {
+ correlationID := ""
+ if val, found := params[metadatahandler.CorrelationIDKey]; found {
correlationID, _ = val.(string)
}
-
- replJob := datastore.ReplJob{
- Attempts: event.Attempt,
- Change: event.Job.Change,
- ID: event.ID,
- VirtualStorage: event.Job.VirtualStorage,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- RelativePath: event.Job.RelativePath,
- Params: event.Job.Params,
- CorrelationID: correlationID,
- CreatedAt: event.CreatedAt,
- }
-
- return replJob, nil
+ return correlationID
}
// ProcessBacklog starts processing of queued jobs.
@@ -428,7 +402,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
for _, target := range targetNodes {
- events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
if err != nil {
logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
continue
@@ -438,31 +412,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState := map[datastore.JobState][]uint64{}
for _, event := range events {
- job, err := r.createReplJob(event)
- if err != nil {
- logger.WithField("event", event).WithError(err).Error("failed to restore replication job")
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- continue
- }
-
- source, err := shard.GetNode(job.SourceNode.Storage)
- if err != nil {
- logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job")
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- continue
- }
-
- if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil {
+ if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- logWithReplVirtual: job.VirtualStorage,
- logWithReplTarget: job.TargetNode.Storage,
- logWithReplSource: job.SourceNode.Storage,
- logWithReplChange: job.Change,
- logWithReplPath: job.RelativePath,
- logWithCorrID: job.CorrelationID,
+ logWithReplJobID: event.ID,
+ logWithReplVirtual: event.Job.VirtualStorage,
+ logWithReplTarget: event.Job.TargetNodeStorage,
+ logWithReplSource: event.Job.SourceNodeStorage,
+ logWithReplChange: event.Job.Change,
+ logWithReplPath: event.Job.RelativePath,
+ logWithCorrID: getCorrelationID(event.Meta),
}).WithError(err).Error("replication job failed")
- if job.Attempts == 0 {
+ if event.Attempt <= 0 {
eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
} else {
eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
@@ -472,7 +432,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
}
for state, eventIDs := range eventIDsByState {
- ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs)
+ ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
if err != nil {
logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
continue
@@ -502,15 +462,13 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
- l := r.log.
- WithField(logWithReplJobID, job.ID).
- WithField(logWithReplVirtual, job.VirtualStorage).
- WithField(logWithReplSource, job.SourceNode).
- WithField(logWithReplTarget, job.TargetNode).
- WithField(logWithReplPath, job.RelativePath).
- WithField(logWithReplChange, job.Change).
- WithField(logWithCorrID, job.CorrelationID)
+func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
+ source, err := shard.GetNode(event.Job.SourceNodeStorage)
+ if err != nil {
+ return fmt.Errorf("get source node: %w", err)
+ }
+
+ cid := getCorrelationID(event.Meta)
var replCtx context.Context
var cancel func()
@@ -522,48 +480,40 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
}
defer cancel()
- injectedCtx, err := helper.InjectGitalyServers(replCtx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
+ injectedCtx, err := helper.InjectGitalyServers(replCtx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken())
if err != nil {
- l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
- return err
- }
-
- if job.CorrelationID == "" {
- l.Warn("replication job missing correlation ID")
+ return fmt.Errorf("inject Gitaly servers into context: %w", err)
}
- injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, job.CorrelationID)
+ injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, cid)
replStart := time.Now()
- replDelay := replStart.Sub(job.CreatedAt)
- r.replDelayMetric.WithLabelValues(job.Change.String()).Observe(replDelay.Seconds())
+ r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds())
r.replQueueMetric.Inc()
defer r.replQueueMetric.Dec()
- switch job.Change {
+ switch event.Job.Change {
case datastore.UpdateRepo:
- err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC)
+ err = r.replicator.Replicate(injectedCtx, event, source.GetConnection(), targetCC)
case datastore.DeleteRepo:
- err = r.replicator.Destroy(injectedCtx, job, targetCC)
+ err = r.replicator.Destroy(injectedCtx, event, targetCC)
case datastore.RenameRepo:
- err = r.replicator.Rename(injectedCtx, job, targetCC)
+ err = r.replicator.Rename(injectedCtx, event, targetCC)
case datastore.GarbageCollect:
- err = r.replicator.GarbageCollect(injectedCtx, job, targetCC)
+ err = r.replicator.GarbageCollect(injectedCtx, event, targetCC)
case datastore.RepackFull:
- err = r.replicator.RepackFull(injectedCtx, job, targetCC)
+ err = r.replicator.RepackFull(injectedCtx, event, targetCC)
case datastore.RepackIncremental:
- err = r.replicator.RepackIncremental(injectedCtx, job, targetCC)
+ err = r.replicator.RepackIncremental(injectedCtx, event, targetCC)
default:
- err = fmt.Errorf("unknown replication change type encountered: %q", job.Change)
+ err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
if err != nil {
- l.WithError(err).Error("unable to replicate")
return err
}
- replDuration := time.Since(replStart)
- r.replLatencyMetric.WithLabelValues(job.Change.String()).Observe(replDuration.Seconds())
+ r.replLatencyMetric.WithLabelValues(event.Job.Change.String()).Observe(time.Since(replStart).Seconds())
return nil
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 51772ede5..46053e7d1 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -16,6 +16,7 @@ import (
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -127,19 +128,21 @@ func TestProcessReplicationJob(t *testing.T) {
secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name)
require.NoError(t, err)
- var jobs []datastore.ReplJob
+ var events []datastore.ReplicationEvent
for _, secondary := range secondaries {
- jobs = append(jobs, datastore.ReplJob{
- Change: datastore.UpdateRepo,
- TargetNode: secondary,
- SourceNode: primary,
- RelativePath: testRepo.GetRelativePath(),
- State: datastore.JobStateReady,
- Attempts: 3,
- CorrelationID: "correlation-id",
+ events = append(events, datastore.ReplicationEvent{
+ State: datastore.JobStateReady,
+ Attempt: 3,
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ TargetNodeStorage: secondary.Storage,
+ SourceNodeStorage: primary.Storage,
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
})
}
- require.Len(t, jobs, 1)
+ require.Len(t, events, 1)
commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
Message: "a commit",
@@ -159,6 +162,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
+ conf.VirtualStorageNames(),
ds,
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
@@ -172,7 +176,7 @@ func TestProcessReplicationJob(t *testing.T) {
require.NoError(t, err)
require.Len(t, shard.Secondaries, 1)
- replMgr.processReplJob(ctx, jobs[0], shard.Primary.GetConnection(), shard.Secondaries[0].GetConnection())
+ replMgr.processReplJob(ctx, events[0], shard, shard.Secondaries[0].GetConnection())
relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath)
require.NoError(t, err)
@@ -229,7 +233,7 @@ func TestPropagateReplicationJob(t *testing.T) {
coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
- replmgr := NewReplMgr(logEntry, ds, nodeMgr)
+ replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
prf := NewServer(
coordinator.StreamDirector,
@@ -521,7 +525,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -659,7 +663,7 @@ func TestProcessBacklog_Success(t *testing.T) {
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -676,11 +680,11 @@ func TestProcessBacklog_Success(t *testing.T) {
type mockReplicator struct {
Replicator
- ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
+ ReplicateFunc func(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
}
-func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
- return m.ReplicateFunc(ctx, job, source, target)
+func (m mockReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error {
+ return m.ReplicateFunc(ctx, event, source, target)
}
func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
@@ -720,6 +724,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
+ conf.VirtualStorageNames(),
datastore.Datastore{datastore.NewInMemory(conf), queue},
&mockNodeManager{
GetShardFunc: func(vs string) (nodes.Shard, error) {
@@ -741,7 +746,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
processed := make(chan struct{})
replMgr.replicator = mockReplicator{
- ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
+ ReplicateFunc: func(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error {
require.True(t, primaryConn == target)
require.True(t, secondaryConn == source)
close(processed)