Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-04 00:38:04 +0300
committerJohn Cai <jcai@gitlab.com>2019-07-06 20:32:59 +0300
commit2a13320a2a3f536b2bc30d2176919d355c857ad2 (patch)
treec31f1f1a979bed7c23a8265bf1ac680208d6f6d7
parent80d97b14e1614d3fc19df2cf089bede962979bf9 (diff)
New praefect nodes data interfacejc-praefect-datamodel
-rw-r--r--cmd/praefect/main.go4
-rw-r--r--config.praefect.toml.example15
-rw-r--r--internal/praefect/common.go2
-rw-r--r--internal/praefect/config/config.go12
-rw-r--r--internal/praefect/config/config_test.go18
-rw-r--r--internal/praefect/config/testdata/config.toml6
-rw-r--r--internal/praefect/coordinator.go12
-rw-r--r--internal/praefect/datastore.go210
-rw-r--r--internal/praefect/datastore_memory_test.go22
-rw-r--r--internal/praefect/datastore_test.go8
-rw-r--r--internal/praefect/replicator_test.go34
-rw-r--r--internal/praefect/server_test.go6
12 files changed, 182 insertions, 167 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index b4efeeae9..71207a371 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -103,9 +103,7 @@ func run(listeners []net.Listener, conf config.Config) error {
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
- allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
-
- for _, gitaly := range allBackendServers {
+ for _, gitaly := range conf.Servers {
if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil {
return fmt.Errorf("failed to register %s: %s", gitaly.Name, err)
}
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index 59e7563f1..424da961f 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -23,14 +23,15 @@ listen_addr = "127.0.0.1:2305"
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
-[primary_server]
+# The first server in the list serves as the default primary for repositories that don't have a "primary" node configured
+[server]
name = "default"
listen_addr = "tcp://gitaly-primary.example.com"
-# [[secondary_server]]
-# name = "default"
-# listen_addr = "tcp://gitaly-backup1.example.com"
+[server]
+ name = "server-1"
+ listen_addr = "tcp://gitaly-backup1.example.com"
-# [[secondary_server]]
-# name = "backup"
-# listen_addr = "tcp://gitaly-backup2.example.com"
+[server]
+ name = "server-2"
+ listen_addr = "tcp://gitaly-backup2.example.com"
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index a63d309bd..e4bce3583 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -12,6 +12,8 @@ type Repository struct {
// Node is a wrapper around the grpc client connection for a backend Gitaly node
type Node struct {
Storage string
+ Address string
+ Token string
cc *grpc.ClientConn
}
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 768104ed1..f422e2232 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -13,8 +13,7 @@ type Config struct {
ListenAddr string `toml:"listen_addr"`
SocketPath string `toml:"socket_path"`
- PrimaryServer *GitalyServer `toml:"primary_server"`
- SecondaryServers []*GitalyServer `toml:"secondary_server"`
+ Servers []*GitalyServer `toml:"server"`
// Whitelist is a list of relative project paths (paths comprised of project
// hashes) that are permitted to use high availability features
@@ -50,20 +49,19 @@ var (
errGitalyWithoutName = errors.New("all gitaly servers must have a name")
)
-var emptyServer = &GitalyServer{}
-
// Validate establishes if the config is valid
func (c Config) Validate() error {
if c.ListenAddr == "" && c.SocketPath == "" {
return errNoListener
}
- if c.PrimaryServer == nil || c.PrimaryServer == emptyServer {
+ listenAddrs := make(map[string]bool, len(c.Servers)+1)
+
+ if len(c.Servers) == 0 {
return errNoGitalyServers
}
- listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1)
- for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) {
+ for _, gitaly := range c.Servers {
if gitaly.Name == "" {
return errGitalyWithoutName
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 33731b17d..e3d668789 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -21,12 +21,12 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{SocketPath: "/tmp/praefect.socket", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)},
err: nil,
},
{
@@ -36,12 +36,12 @@ func TestConfigValidation(t *testing.T) {
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}},
+ config: Config{ListenAddr: "localhost:1234", Servers: []*GitalyServer{primarySrv, primarySrv}},
err: errDuplicateGitalyAddr,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "localhost:1234", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)},
err: nil,
},
}
@@ -62,11 +62,11 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- PrimaryServer: &GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*GitalyServer{
+ Servers: []*GitalyServer{
+ {
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
{
Name: "default",
ListenAddr: "tcp://gitaly-backup1.example.com",
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 81701a359..60afdf169 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -3,15 +3,15 @@ socket_path = ""
whitelist = ["abcd1234", "edfg5678"]
prometheus_listen_addr = ""
-[primary_server]
+[[server]]
name = "default"
listen_addr = "tcp://gitaly-primary.example.com"
-[[secondary_server]]
+[[server]]
name = "default"
listen_addr = "tcp://gitaly-backup1.example.com"
-[[secondary_server]]
+[[server]]
name = "backup"
listen_addr = "tcp://gitaly-backup2.example.com"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b2e6704d5..dc8cdb1d6 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -25,14 +25,14 @@ type Coordinator struct {
log *logrus.Logger
lock sync.RWMutex
- datastore PrimaryDatastore
+ datastore NodesDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore NodesDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -63,12 +63,12 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- storageName, err := c.datastore.GetPrimary()
+ node, err := c.datastore.GetDefaultPrimary()
if err != nil {
err := status.Error(
codes.FailedPrecondition,
@@ -79,9 +79,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, ok := c.getConn(storageName)
+ cc, ok := c.getConn(node.Storage)
if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName)
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", node.Storage)
}
return ctx, cc, nil
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index eeb9f9728..d2283f0fa 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -62,28 +62,22 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
// persistence needs
type Datastore interface {
ReplJobsDatastore
- ReplicasDatastore
- PrimaryDatastore
+ NodesDatastore
}
-// PrimaryDatastore manages accessing and setting the primary storage location
-type PrimaryDatastore interface {
- // GetPrimary gets the primary storage location
- GetPrimary() (string, error)
- // SetPrimary sets the primary storage location
- SetPrimary(primary string) error
-}
+// NodesDatastore manages accessing and setting the primary storage location
+type NodesDatastore interface {
+ GetDefaultPrimary() (Node, error)
+
+ GetPrimary(repo Repository) (Node, error)
+
+ SetPrimary(repo Repository, node Node) error
+
+ GetSecondaries(repo Repository) ([]Node, error)
-// ReplicasDatastore manages accessing and setting which secondary replicas
-// backup a repository
-type ReplicasDatastore interface {
- // GetSecondaries will retrieve all secondary replica storage locations for
- // a primary replica
- GetSecondaries(primary Repository) ([]string, error)
+ RemoveSecondary(repo Repository, node Node) error
- // SetSecondaries will set the secondary storage locations for a repository
- // in a primary replica.
- SetSecondaries(primary Repository, secondaries []string) error
+ AddSecondaries(repo Repository, node ...Node) error
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -103,12 +97,6 @@ type ReplJobsDatastore interface {
UpdateReplJob(jobID uint64, newState JobState) error
}
-// shard is a set of primary and secondary storage replicas for a project
-type shard struct {
- primary string
- secondaries []string
-}
-
type jobRecord struct {
relativePath string // project's relative path
target string
@@ -119,32 +107,23 @@ type jobRecord struct {
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ nodes *struct {
sync.RWMutex
- storageName string
+ reposToSecondaries map[string][]Node
+ reposToPrimary map[string]Node
+ defaultPrimary Node
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
- replicas: &struct {
- sync.RWMutex
- m map[string]shard
- }{
- m: map[string]shard{},
- },
jobs: &struct {
sync.RWMutex
next uint64
@@ -153,33 +132,48 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ nodes: &struct {
sync.RWMutex
- storageName string
+ reposToSecondaries map[string][]Node
+ reposToPrimary map[string]Node
+ defaultPrimary Node
}{
- storageName: cfg.PrimaryServer.Name,
+ reposToSecondaries: make(map[string][]Node),
+ reposToPrimary: make(map[string]Node),
+ defaultPrimary: Node{
+ Storage: cfg.Servers[0].Name,
+ Address: cfg.Servers[0].ListenAddr,
+ Token: "",
+ },
},
}
- secondaries := make([]string, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaries[i] = server.Name
- }
-
for _, relativePath := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[relativePath] = shard{
- primary: cfg.PrimaryServer.Name,
- secondaries: secondaries,
+ secondaries := make([]Node, 0)
+ defaultPrimary, err := m.GetDefaultPrimary()
+ if err != nil {
+ panic(fmt.Sprintf("could not get default primary: %v", err))
}
+ for _, server := range cfg.Servers {
+ if server.Name == defaultPrimary.Storage {
+ continue
+ }
+ secondaries = append(secondaries, Node{
+ Storage: server.Name,
+ Address: server.ListenAddr,
+ Token: "",
+ })
+ }
+ m.nodes.reposToSecondaries[relativePath] = secondaries
+
// initialize replication job queue to replicate all whitelisted repos
// to every secondary server
- for _, secondary := range cfg.SecondaryServers {
+ for _, secondary := range m.nodes.reposToSecondaries[relativePath] {
m.jobs.next++
m.jobs.records[m.jobs.next] = jobRecord{
state: JobStateReady,
- target: secondary.Name,
+ target: secondary.Storage,
relativePath: relativePath,
}
}
@@ -189,32 +183,73 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
return m
}
+// GetDefaultPrimary gets the default primary from the MemoryDatastore
+func (md *MemoryDatastore) GetDefaultPrimary() (Node, error) {
+ return md.nodes.defaultPrimary, nil
+}
+
+// GetPrimary gets the primary datastore location
+func (md *MemoryDatastore) GetPrimary(repo Repository) (Node, error) {
+ md.nodes.RLock()
+ defer md.nodes.RUnlock()
+
+ primary, ok := md.nodes.reposToPrimary[repo.RelativePath]
+ if !ok {
+ return md.GetDefaultPrimary()
+ }
+
+ return primary, nil
+}
+
+// SetPrimary sets the primary for the MemoryDatastore
+func (md *MemoryDatastore) SetPrimary(repo Repository, node Node) error {
+ md.nodes.Lock()
+ defer md.nodes.Unlock()
+
+ md.nodes.reposToPrimary[repo.RelativePath] = node
+ return nil
+}
+
// GetSecondaries will return the set of secondary storage locations for a
// given repository if they exist
-func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) {
- shard, _ := md.getShard(primary.RelativePath)
-
- return shard.secondaries, nil
+func (md *MemoryDatastore) GetSecondaries(repo Repository) ([]Node, error) {
+ return md.nodes.reposToSecondaries[repo.RelativePath], nil
}
-// SetSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error {
- md.replicas.Lock()
- md.replicas.m[primary.RelativePath] = shard{
- primary: primary.Storage,
- secondaries: secondaries,
+// RemoveSecondary will return the set of secondary storage locations for a
+// given repository if they exist
+func (md *MemoryDatastore) RemoveSecondary(repo Repository, node Node) error {
+
+ secondaries, ok := md.nodes.reposToSecondaries[repo.RelativePath]
+ if !ok {
+ return nil
+ }
+
+ for i, secondary := range secondaries {
+ if secondary == node {
+ secondaries = append(secondaries[:i], secondaries[i+1:]...)
+ md.nodes.reposToSecondaries[repo.RelativePath] = secondaries
+ return nil
+ }
}
- md.replicas.Unlock()
return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+// AddSecondaries will add a secondary to the list of secondaries for a repository
+func (md *MemoryDatastore) AddSecondaries(repo Repository, nodes ...Node) error {
+ md.nodes.Lock()
+ defer md.nodes.Unlock()
+
+ secondaries, ok := md.nodes.reposToSecondaries[repo.RelativePath]
+ if !ok {
+ secondaries = make([]Node, 0)
+ }
+ secondaries = append(secondaries, nodes...)
- return replicas, ok
+ md.nodes.reposToSecondaries[repo.RelativePath] = secondaries
+
+ return nil
}
// ErrSecondariesMissing indicates the repository does not have any backup
@@ -251,8 +286,8 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
// replJobFromRecord constructs a replication job from a record and by cross
// referencing the current shard for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
+ primary, err := md.GetPrimary(Repository{RelativePath: record.relativePath})
+ if err != nil {
return ReplJob{}, fmt.Errorf(
"unable to find shard for project at relative path %q",
record.relativePath,
@@ -263,7 +298,7 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
ID: jobID,
Source: Repository{
RelativePath: record.relativePath,
- Storage: shard.primary,
+ Storage: primary.Storage,
},
State: record.state,
Target: record.target,
@@ -285,8 +320,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64,
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ secondaries, err := md.GetSecondaries(Repository{RelativePath: source.RelativePath})
+ if err != nil {
+ return nil, err
+ }
+
+ if len(secondaries) == 0 {
return nil, fmt.Errorf(
"unable to find shard for project at relative path %q",
source.RelativePath,
@@ -295,12 +334,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64,
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range secondaries {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- target: secondary,
+ target: secondary.Storage,
state: JobStatePending,
relativePath: source.RelativePath,
}
@@ -331,26 +370,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary string) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.storageName = primary
-
- return nil
-}
-
-// GetPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetPrimary() (string, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- storageName := md.primary.storageName
- if storageName == "" {
- return "", ErrPrimaryNotSet
- }
-
- return storageName, nil
-}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index a306b3ce6..502bead0d 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -13,10 +13,10 @@ import (
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
cfg := config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
- },
- SecondaryServers: []*config.GitalyServer{
+ Servers: []*config.GitalyServer{
+ {
+ Name: "default",
+ },
{
Name: "backup-1",
},
@@ -34,16 +34,16 @@ func TestMemoryDatastoreWhitelist(t *testing.T) {
repo1 := praefect.Repository{
RelativePath: cfg.Whitelist[0],
- Storage: cfg.PrimaryServer.Name,
+ Storage: cfg.Servers[0].Name,
}
repo2 := praefect.Repository{
RelativePath: cfg.Whitelist[1],
- Storage: cfg.PrimaryServer.Name,
+ Storage: cfg.Servers[0].Name,
}
- expectSecondaries := []string{
- cfg.SecondaryServers[0].Name,
- cfg.SecondaryServers[1].Name,
+ expectSecondaries := []praefect.Node{
+ praefect.Node{Storage: cfg.Servers[1].Name},
+ praefect.Node{Storage: cfg.Servers[2].Name},
}
for _, repo := range []praefect.Repository{repo1, repo2} {
@@ -52,8 +52,8 @@ func TestMemoryDatastoreWhitelist(t *testing.T) {
require.ElementsMatch(t, actualSecondaries, expectSecondaries)
}
- backup1 := cfg.SecondaryServers[0]
- backup2 := cfg.SecondaryServers[1]
+ backup1 := cfg.Servers[1]
+ backup2 := cfg.Servers[2]
backup1ExpectedJobs := []praefect.ReplJob{
praefect.ReplJob{
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 6534b9d88..a0e89ceb6 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -43,7 +43,7 @@ var operations = []struct {
{
desc: "associate the replication job target with a primary",
opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetSecondaries(repo1Primary, []string{stor2})
+ err := ds.AddSecondaries(repo1Primary, praefect.Node{Storage: stor2})
require.NoError(t, err)
},
},
@@ -93,8 +93,10 @@ var flavors = map[string]func() praefect.Datastore{
"in-memory-datastore": func() praefect.Datastore {
return praefect.NewMemoryDatastore(
config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
+ Servers: []*config.GitalyServer{
+ {
+ Name: "default",
+ },
},
})
},
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index ac2dcf6f6..ec9eacee4 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -28,11 +28,11 @@ import (
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
var (
cfg = config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*config.GitalyServer{
+ Servers: []*config.GitalyServer{
+ {
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
{
Name: "backup1",
ListenAddr: "tcp://gitaly-backup1.example.com",
@@ -51,7 +51,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
coordinator = praefect.NewCoordinator(logrus.New(), datastore)
resultsCh = make(chan result)
replman = praefect.NewReplMgr(
- cfg.SecondaryServers[1].Name,
+ cfg.Servers[2].Name,
logrus.New(),
datastore,
coordinator,
@@ -60,11 +60,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
)
)
- for _, node := range []*config.GitalyServer{
- cfg.PrimaryServer,
- cfg.SecondaryServers[0],
- cfg.SecondaryServers[1],
- } {
+ for _, node := range cfg.Servers {
err := coordinator.RegisterNode(node.Name, node.ListenAddr)
require.NoError(t, err)
}
@@ -85,8 +81,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
result := <-resultsCh
assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name)
- assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name)
+ assert.Equal(t, cfg.Servers[2].Name, result.target.Storage)
+ assert.Equal(t, cfg.Servers[0].Name, result.source.Storage)
}
cancel()
@@ -140,11 +136,11 @@ func TestReplicate(t *testing.T) {
defer cleanupFn()
var (
cfg = config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*config.GitalyServer{
+ Servers: []*config.GitalyServer{
+ {
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
{
Name: "backup",
ListenAddr: "tcp://gitaly-backup1.example.com",
@@ -184,7 +180,7 @@ func TestReplicate(t *testing.T) {
coordinator.RegisterNode("default", socketPath)
replman := praefect.NewReplMgr(
- cfg.SecondaryServers[0].Name,
+ cfg.Servers[1].Name,
logrus.New(),
datastore,
coordinator,
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index b6f2a8fd9..f4120a4ca 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -51,8 +51,10 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
)
datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
+ Servers: []*config.GitalyServer{
+ {
+ Name: "default",
+ },
},
})
coordinator := praefect.NewCoordinator(logrus.New(), datastore)