Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-16 21:54:21 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-29 21:33:21 +0300
commit80d0cf22a432b4cca1f2066e89e716690298b6d8 (patch)
tree63f7cedd7f68783f1326026e6aec79548db64157
parent1e6359b1d26ce7ff75d65a6c8816b0d5f9dc13df (diff)
simplify praefect routing to primary and replication jobs
simplify praefect routing to no longer track primary and replicas per repository. This overly complicates things and we only really need to know which replicas are were, which we already have through the replication jobs.
-rw-r--r--changelogs/unreleased/jc-simplify-praefect-routing.yml5
-rw-r--r--internal/praefect/common.go6
-rw-r--r--internal/praefect/coordinator.go60
-rw-r--r--internal/praefect/coordinator_test.go12
-rw-r--r--internal/praefect/datastore/datastore.go162
-rw-r--r--internal/praefect/datastore/datastore_test.go73
-rw-r--r--internal/praefect/replicator.go37
-rw-r--r--internal/praefect/replicator_test.go11
8 files changed, 74 insertions, 292 deletions
diff --git a/changelogs/unreleased/jc-simplify-praefect-routing.yml b/changelogs/unreleased/jc-simplify-praefect-routing.yml
new file mode 100644
index 000000000..f8326e8ae
--- /dev/null
+++ b/changelogs/unreleased/jc-simplify-praefect-routing.yml
@@ -0,0 +1,5 @@
+---
+title: simplify praefect routing to primary and replication jobs
+merge_request: 1760
+author:
+type: changed
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
deleted file mode 100644
index a09a292ad..000000000
--- a/internal/praefect/common.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package praefect
-
-// logging keys to use with logrus WithField
-const (
- logKeyProjectPath = "ProjectPath"
-)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index a476b61c9..f1b1d5492 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,7 +12,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
- internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -136,7 +135,12 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
return "", nil, err
}
- primary, err := c.selectPrimary(mi, targetRepo)
+ primary, err := c.datastore.GetPrimary(targetRepo.GetStorageName())
+ if err != nil {
+ return "", nil, err
+ }
+
+ secondaries, err := c.datastore.GetSecondaries(targetRepo.GetStorageName())
if err != nil {
return "", nil, err
}
@@ -171,7 +175,7 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
change = datastore.DeleteRepo
}
- if requestFinalizer, err = c.createReplicaJobs(targetRepo, change); err != nil {
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
return "", nil, err
}
}
@@ -179,52 +183,6 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
return primary.Storage, requestFinalizer, nil
}
-func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) {
- primary, err := c.datastore.GetPrimary(targetRepo.GetRelativePath())
-
- if err != nil {
- if err != datastore.ErrPrimaryNotSet {
- return nil, err
- }
- // if there are no primaries for this repository, pick one
- nodes, err := c.datastore.GetStorageNodes()
- if err != nil {
- return nil, err
- }
-
- if len(nodes) == 0 {
- return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
- }
-
- newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName())
- if err != nil {
- if err == datastore.ErrNoPrimaryForStorage {
- return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error())
- }
- return nil, fmt.Errorf("could not choose a primary: %v", err)
- }
-
- // set the primary
- if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.Storage); err != nil {
- return nil, err
- }
-
- // add replicas
- for _, replica := range nodes {
- if replica.DefaultPrimary {
- continue
- }
- if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.Storage); err != nil {
- return nil, err
- }
- }
-
- return &newPrimary, nil
- }
-
- return &primary, nil
-}
-
func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) {
frame, err := peeker.Peek()
if err != nil {
@@ -239,8 +197,8 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change datastore.ChangeType) (func(), error) {
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, change)
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary models.Node, secondaries []models.Node, change datastore.ChangeType) (func(), error) {
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary, secondaries, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 22013e830..4283779a1 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -102,12 +102,12 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
expectedJob := datastore.ReplJob{
- Change: datastore.UpdateRepo,
- ID: 1,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- State: datastore.JobStatePending,
- Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
+ Change: datastore.UpdateRepo,
+ ID: 1,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ State: datastore.JobStatePending,
+ RelativePath: targetRepo.RelativePath,
}
require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index c8de9d82c..eb3cb0087 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -53,9 +53,9 @@ const (
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
Change ChangeType
- ID uint64 // autoincrement ID
- TargetNode, SourceNode models.Node // which node to replicate to?
- Repository models.Repository // source for replication
+ ID uint64 // autoincrement ID
+ TargetNode, SourceNode models.Node // which node to replicate to?
+ RelativePath string // source for replication
State JobState
}
@@ -80,23 +80,15 @@ type Datastore interface {
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- PickAPrimary(virtualStorage string) (models.Node, error)
+ GetPrimary(virtualStorage string) (models.Node, error)
+
+ GetSecondaries(virtualStorage string) ([]models.Node, error)
GetReplicas(relativePath string) ([]models.Node, error)
GetStorageNode(nodeStorage string) (models.Node, error)
GetStorageNodes() ([]models.Node, error)
-
- GetPrimary(relativePath string) (models.Node, error)
-
- SetPrimary(relativePath, nodeStorage string) error
-
- AddReplica(relativePath string, nodeStorage string) error
-
- RemoveReplica(relativePath, nodeStorage string) error
-
- GetRepository(relativePath string) (models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -107,10 +99,10 @@ type ReplJobsDatastore interface {
// count-length.
GetJobs(flag JobState, nodeStorage string, count int) ([]ReplJob, error)
- // CreateReplicaJobs will create replication jobs for each secondary
+ // CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
@@ -182,8 +174,8 @@ func NewInMemory(cfg config.Config) *MemoryDatastore {
// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it
var ErrNoPrimaryForStorage = errors.New("no primary for storage")
-// PickAPrimary returns the primary configured in the config file
-func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) {
+// GetPrimary returns the primary configured in the config file
+func (md *MemoryDatastore) GetPrimary(virtualStorage string) (models.Node, error) {
for _, node := range md.virtualStorages[virtualStorage] {
if node.DefaultPrimary {
return *node, nil
@@ -193,6 +185,19 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err
return models.Node{}, ErrNoPrimaryForStorage
}
+// GetSecondaries gets the secondary nodes associated with a virtual storage
+func (md *MemoryDatastore) GetSecondaries(virtualStorage string) ([]models.Node, error) {
+ var secondaries []models.Node
+
+ for _, node := range md.virtualStorages[virtualStorage] {
+ if !node.DefaultPrimary {
+ secondaries = append(secondaries, *node)
+ }
+ }
+
+ return secondaries, nil
+}
+
// GetReplicas gets the secondaries for a repository based on the relative path
func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) {
md.repositories.RLock()
@@ -228,96 +233,6 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
return storageNodes, nil
}
-// GetPrimary gets the primary storage node for a repository of a repository relative path
-func (md *MemoryDatastore) GetPrimary(relativePath string) (models.Node, error) {
- md.repositories.RLock()
- defer md.repositories.RUnlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return models.Node{}, ErrPrimaryNotSet
- }
-
- return repository.Primary, nil
-}
-
-// SetPrimary sets the primary storagee node for a repository of a repository relative path
-func (md *MemoryDatastore) SetPrimary(relativePath, nodeStorage string) error {
- md.repositories.Lock()
- defer md.repositories.Unlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- repository = models.Repository{RelativePath: relativePath}
- }
-
- storageNode, ok := md.storageNodes[nodeStorage]
- if !ok {
- return errors.New("node storage not found")
- }
-
- repository.Primary = storageNode
-
- md.repositories.m[relativePath] = repository
- return nil
-}
-
-// AddReplica adds a secondary to a repository of a repository relative path
-func (md *MemoryDatastore) AddReplica(relativePath, nodeStorage string) error {
- md.repositories.Lock()
- defer md.repositories.Unlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return errors.New("repository not found")
- }
-
- storageNode, ok := md.storageNodes[nodeStorage]
- if !ok {
- return errors.New("node storage not found")
- }
-
- repository.Replicas = append(repository.Replicas, storageNode)
-
- md.repositories.m[relativePath] = repository
- return nil
-}
-
-// RemoveReplica removes a secondary from a repository of a repository relative path
-func (md *MemoryDatastore) RemoveReplica(relativePath, nodeStorage string) error {
- md.repositories.Lock()
- defer md.repositories.Unlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return errors.New("repository not found")
- }
-
- var secondaries []models.Node
- for _, secondary := range repository.Replicas {
- if secondary.Storage != nodeStorage {
- secondaries = append(secondaries, secondary)
- }
- }
-
- repository.Replicas = secondaries
- md.repositories.m[relativePath] = repository
- return nil
-}
-
-// GetRepository gets the repository for a repository relative path
-func (md *MemoryDatastore) GetRepository(relativePath string) (models.Repository, error) {
- md.repositories.Lock()
- defer md.repositories.Unlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return models.Repository{}, errors.New("repository not found")
- }
-
- return repository.Clone(), nil
-}
-
// ErrReplicasMissing indicates the repository does not have any backup
// replicas
var ErrReplicasMissing = errors.New("repository missing secondary replicas")
@@ -352,11 +267,6 @@ func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, cou
// replJobFromRecord constructs a replication job from a record and by cross
// referencing the current repository for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- repository, err := md.GetRepository(record.relativePath)
- if err != nil {
- return ReplJob{}, err
- }
-
sourceNode, err := md.GetStorageNode(record.sourceNodeStorage)
if err != nil {
return ReplJob{}, err
@@ -367,12 +277,12 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
}
return ReplJob{
- Change: record.change,
- ID: jobID,
- Repository: repository,
- SourceNode: sourceNode,
- State: record.state,
- TargetNode: targetNode,
+ Change: record.change,
+ ID: jobID,
+ RelativePath: record.relativePath,
+ SourceNode: sourceNode,
+ State: record.state,
+ TargetNode: targetNode,
}, nil
}
@@ -382,7 +292,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -390,17 +300,9 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change Cha
return nil, errors.New("invalid source repository")
}
- repository, err := md.GetRepository(relativePath)
- if err != nil {
- return nil, fmt.Errorf(
- "unable to find repository for project at relative path %q",
- relativePath,
- )
- }
-
var jobIDs []uint64
- for _, secondary := range repository.Replicas {
+ for _, secondary := range secondaries {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.records[nextID] = jobRecord{
@@ -408,7 +310,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change Cha
targetNodeStorage: secondary.Storage,
state: JobStatePending,
relativePath: relativePath,
- sourceNodeStorage: repository.Primary.Storage,
+ sourceNodeStorage: primary.Storage,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 83af5bb17..6e908ffb1 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -40,52 +40,26 @@ var operations = []struct {
},
},
{
- desc: "insert first replication job before secondary mapped to primary",
+ desc: "insert replication job",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, UpdateRepo)
- require.Error(t, err, ErrInvalidReplTarget)
- },
- },
- {
- desc: "set the primary for the repository",
- opFn: func(t *testing.T, ds Datastore) {
- err := ds.SetPrimary(repo1Repository.RelativePath, stor1.Storage)
- require.NoError(t, err)
- },
- },
- {
- desc: "add a secondary replica for the repository",
- opFn: func(t *testing.T, ds Datastore) {
- err := ds.AddReplica(repo1Repository.RelativePath, stor2.Storage)
- require.NoError(t, err)
- },
- },
- {
- desc: "insert first replication job after secondary mapped to primary",
- opFn: func(t *testing.T, ds Datastore) {
- ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1, []models.Node{stor2}, UpdateRepo)
require.NoError(t, err)
- require.Equal(t, []uint64{1}, ids)
},
},
{
- desc: "fetch inserted replication jobs after primary mapped",
+ desc: "fetch inserted replication jobs",
opFn: func(t *testing.T, ds Datastore) {
jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.Storage, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
expectedJob := ReplJob{
- Change: UpdateRepo,
- ID: 1,
- Repository: models.Repository{
- RelativePath: repo1Repository.RelativePath,
- Primary: stor1,
- Replicas: []models.Node{stor2},
- },
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
+ Change: UpdateRepo,
+ ID: 1,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
@@ -134,32 +108,3 @@ func TestDatastoreInterface(t *testing.T) {
})
}
}
-
-func TestMemoryDatastore_GetRepository(t *testing.T) {
- ds := NewInMemory(config.Config{
- VirtualStorages: []*config.VirtualStorage{
- {
- Nodes: []*models.Node{&stor1, &stor2},
- },
- },
- })
- require.NoError(t, ds.SetPrimary(repo1Repository.RelativePath, stor1.Storage))
- require.NoError(t, ds.AddReplica(repo1Repository.RelativePath, stor2.Storage))
-
- repBefore, err := ds.GetRepository(repo1Repository.RelativePath)
- require.NoError(t, err)
-
- expRepo := models.Repository{
- RelativePath: repo1Repository.RelativePath,
- Primary: stor1,
- Replicas: []models.Node{stor2},
- }
- require.Equal(t, expRepo, repBefore)
-
- initialAddrs := repBefore.Replicas[0].Address
- repBefore.Replicas[0].Address += "/"
-
- repAfter, err := ds.GetRepository(repo1Repository.RelativePath)
- require.NoError(t, err)
- require.Equal(t, initialAddrs, repAfter.Replicas[0].Address, "modification from outside should not affect what is inside storage")
-}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 2695858e5..3d71e22fa 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -11,7 +11,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -32,12 +31,12 @@ type defaultReplicator struct {
func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
- RelativePath: job.Repository.RelativePath,
+ RelativePath: job.RelativePath,
}
sourceRepository := &gitalypb.Repository{
StorageName: job.SourceNode.Storage,
- RelativePath: job.Repository.RelativePath,
+ RelativePath: job.RelativePath,
}
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -98,7 +97,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
- RelativePath: job.Repository.RelativePath,
+ RelativePath: job.RelativePath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
@@ -213,30 +212,6 @@ func WithReplicator(r Replicator) ReplMgrOpt {
}
}
-// ScheduleReplication will store a replication job in the datastore for later
-// execution. It filters out projects that are not whitelisted.
-// TODO: add a parameter to delay replication
-func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository) error {
- _, ok := r.whitelist[repo.RelativePath]
- if !ok {
- r.log.WithField(logKeyProjectPath, repo.RelativePath).
- Infof("project %q is not whitelisted for replication", repo.RelativePath)
- return nil
- }
-
- id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, datastore.UpdateRepo)
- if err != nil {
- return err
- }
-
- r.log.WithFields(logrus.Fields{
- logWithReplJobID: id,
- "relative_path": repo.RelativePath,
- }).Info("replication job created")
-
- return nil
-}
-
const (
logWithReplJobID = "replication_job_id"
logWithReplSource = "replication_job_source"
@@ -284,7 +259,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
logWithReplJobID: job.ID,
"from_storage": job.SourceNode.Storage,
"to_storage": job.TargetNode.Storage,
- "relative_path": job.Repository.RelativePath,
+ "relative_path": job.RelativePath,
}).Info("processing replication job")
r.processReplJob(ctx, job)
}
@@ -325,13 +300,13 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
return
}
- sourceCC, err := r.clientConnections.GetConnection(job.Repository.Primary.Storage)
+ sourceCC, err := r.clientConnections.GetConnection(job.SourceNode.Storage)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
return
}
- injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, job.SourceNode.Token)
+ injectedCtx, err := helper.InjectGitalyServers(ctx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
return
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index c43ff5cdd..444579923 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -61,6 +61,7 @@ func TestProcessReplicationJob(t *testing.T) {
config := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
+ Name: "default",
Nodes: []*models.Node{
&models.Node{
Storage: "default",
@@ -80,9 +81,6 @@ func TestProcessReplicationJob(t *testing.T) {
ds := datastore.NewInMemory(config)
- require.NoError(t, ds.SetPrimary(testRepo.GetRelativePath(), "default"))
- require.NoError(t, ds.AddReplica(testRepo.GetRelativePath(), backupStorageName))
-
// create object pool on the source
objectPoolPath := testhelper.NewTestObjectPoolName(t)
pool, err := objectpool.NewObjectPool(testRepo.GetStorageName(), objectPoolPath)
@@ -112,7 +110,12 @@ func TestProcessReplicationJob(t *testing.T) {
})
require.NoError(t, err)
- _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo)
+ primary, err := ds.GetPrimary(config.VirtualStorages[0].Name)
+ require.NoError(t, err)
+ secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, secondaries, datastore.UpdateRepo)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)