diff options
author | John Cai <jcai@gitlab.com> | 2019-07-04 00:38:04 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-06 20:32:59 +0300 |
commit | 2a13320a2a3f536b2bc30d2176919d355c857ad2 (patch) | |
tree | c31f1f1a979bed7c23a8265bf1ac680208d6f6d7 | |
parent | 80d97b14e1614d3fc19df2cf089bede962979bf9 (diff) |
New praefect nodes data interfacejc-praefect-datamodel
-rw-r--r-- | cmd/praefect/main.go | 4 | ||||
-rw-r--r-- | config.praefect.toml.example | 15 | ||||
-rw-r--r-- | internal/praefect/common.go | 2 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 12 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 6 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 12 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 210 | ||||
-rw-r--r-- | internal/praefect/datastore_memory_test.go | 22 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 34 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 6 |
12 files changed, 182 insertions, 167 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b4efeeae9..71207a371 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -103,9 +103,7 @@ func run(listeners []net.Listener, conf config.Config) error { go func() { serverErrors <- repl.ProcessBacklog(ctx) }() - allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) - - for _, gitaly := range allBackendServers { + for _, gitaly := range conf.Servers { if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil { return fmt.Errorf("failed to register %s: %s", gitaly.Name, err) } diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 59e7563f1..424da961f 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -23,14 +23,15 @@ listen_addr = "127.0.0.1:2305" # as shard. listen_addr should be unique for all nodes. # Requires the protocol to be defined, e.g. tcp://host.tld:1234 -[primary_server] +# The first server in the list serves as the default primary for repositories that don't have a "primary" node configured +[server] name = "default" listen_addr = "tcp://gitaly-primary.example.com" -# [[secondary_server]] -# name = "default" -# listen_addr = "tcp://gitaly-backup1.example.com" +[server] + name = "server-1" + listen_addr = "tcp://gitaly-backup1.example.com" -# [[secondary_server]] -# name = "backup" -# listen_addr = "tcp://gitaly-backup2.example.com" +[server] + name = "server-2" + listen_addr = "tcp://gitaly-backup2.example.com" diff --git a/internal/praefect/common.go b/internal/praefect/common.go index a63d309bd..e4bce3583 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -12,6 +12,8 @@ type Repository struct { // Node is a wrapper around the grpc client connection for a backend Gitaly node type Node struct { Storage string + Address string + Token string cc *grpc.ClientConn } diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 768104ed1..f422e2232 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -13,8 +13,7 @@ type Config struct { ListenAddr string `toml:"listen_addr"` SocketPath string `toml:"socket_path"` - PrimaryServer *GitalyServer `toml:"primary_server"` - SecondaryServers []*GitalyServer `toml:"secondary_server"` + Servers []*GitalyServer `toml:"server"` // Whitelist is a list of relative project paths (paths comprised of project // hashes) that are permitted to use high availability features @@ -50,20 +49,19 @@ var ( errGitalyWithoutName = errors.New("all gitaly servers must have a name") ) -var emptyServer = &GitalyServer{} - // Validate establishes if the config is valid func (c Config) Validate() error { if c.ListenAddr == "" && c.SocketPath == "" { return errNoListener } - if c.PrimaryServer == nil || c.PrimaryServer == emptyServer { + listenAddrs := make(map[string]bool, len(c.Servers)+1) + + if len(c.Servers) == 0 { return errNoGitalyServers } - listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1) - for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) { + for _, gitaly := range c.Servers { if gitaly.Name == "" { return errGitalyWithoutName } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 33731b17d..e3d668789 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -21,12 +21,12 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{SocketPath: "/tmp/praefect.socket", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)}, err: nil, }, { @@ -36,12 +36,12 @@ func TestConfigValidation(t *testing.T) { }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}}, + config: Config{ListenAddr: "localhost:1234", Servers: []*GitalyServer{primarySrv, primarySrv}}, err: errDuplicateGitalyAddr, }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "localhost:1234", Servers: append([]*GitalyServer{primarySrv}, secondarySrvs...)}, err: nil, }, } @@ -62,11 +62,11 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - PrimaryServer: &GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*GitalyServer{ + Servers: []*GitalyServer{ + { + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, { Name: "default", ListenAddr: "tcp://gitaly-backup1.example.com", diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 81701a359..60afdf169 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -3,15 +3,15 @@ socket_path = "" whitelist = ["abcd1234", "edfg5678"] prometheus_listen_addr = "" -[primary_server] +[[server]] name = "default" listen_addr = "tcp://gitaly-primary.example.com" -[[secondary_server]] +[[server]] name = "default" listen_addr = "tcp://gitaly-backup1.example.com" -[[secondary_server]] +[[server]] name = "backup" listen_addr = "tcp://gitaly-backup2.example.com" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b2e6704d5..dc8cdb1d6 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -25,14 +25,14 @@ type Coordinator struct { log *logrus.Logger lock sync.RWMutex - datastore PrimaryDatastore + datastore NodesDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore NodesDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -63,12 +63,12 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) { } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - storageName, err := c.datastore.GetPrimary() + node, err := c.datastore.GetDefaultPrimary() if err != nil { err := status.Error( codes.FailedPrecondition, @@ -79,9 +79,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, ok := c.getConn(storageName) + cc, ok := c.getConn(node.Storage) if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName) + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", node.Storage) } return ctx, cc, nil diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index eeb9f9728..d2283f0fa 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -62,28 +62,22 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I // persistence needs type Datastore interface { ReplJobsDatastore - ReplicasDatastore - PrimaryDatastore + NodesDatastore } -// PrimaryDatastore manages accessing and setting the primary storage location -type PrimaryDatastore interface { - // GetPrimary gets the primary storage location - GetPrimary() (string, error) - // SetPrimary sets the primary storage location - SetPrimary(primary string) error -} +// NodesDatastore manages accessing and setting the primary storage location +type NodesDatastore interface { + GetDefaultPrimary() (Node, error) + + GetPrimary(repo Repository) (Node, error) + + SetPrimary(repo Repository, node Node) error + + GetSecondaries(repo Repository) ([]Node, error) -// ReplicasDatastore manages accessing and setting which secondary replicas -// backup a repository -type ReplicasDatastore interface { - // GetSecondaries will retrieve all secondary replica storage locations for - // a primary replica - GetSecondaries(primary Repository) ([]string, error) + RemoveSecondary(repo Repository, node Node) error - // SetSecondaries will set the secondary storage locations for a repository - // in a primary replica. - SetSecondaries(primary Repository, secondaries []string) error + AddSecondaries(repo Repository, node ...Node) error } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -103,12 +97,6 @@ type ReplJobsDatastore interface { UpdateReplJob(jobID uint64, newState JobState) error } -// shard is a set of primary and secondary storage replicas for a project -type shard struct { - primary string - secondaries []string -} - type jobRecord struct { relativePath string // project's relative path target string @@ -119,32 +107,23 @@ type jobRecord struct { // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + nodes *struct { sync.RWMutex - storageName string + reposToSecondaries map[string][]Node + reposToPrimary map[string]Node + defaultPrimary Node } } // NewMemoryDatastore returns an initialized in-memory datastore func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ - replicas: &struct { - sync.RWMutex - m map[string]shard - }{ - m: map[string]shard{}, - }, jobs: &struct { sync.RWMutex next uint64 @@ -153,33 +132,48 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + nodes: &struct { sync.RWMutex - storageName string + reposToSecondaries map[string][]Node + reposToPrimary map[string]Node + defaultPrimary Node }{ - storageName: cfg.PrimaryServer.Name, + reposToSecondaries: make(map[string][]Node), + reposToPrimary: make(map[string]Node), + defaultPrimary: Node{ + Storage: cfg.Servers[0].Name, + Address: cfg.Servers[0].ListenAddr, + Token: "", + }, }, } - secondaries := make([]string, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaries[i] = server.Name - } - for _, relativePath := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[relativePath] = shard{ - primary: cfg.PrimaryServer.Name, - secondaries: secondaries, + secondaries := make([]Node, 0) + defaultPrimary, err := m.GetDefaultPrimary() + if err != nil { + panic(fmt.Sprintf("could not get default primary: %v", err)) } + for _, server := range cfg.Servers { + if server.Name == defaultPrimary.Storage { + continue + } + secondaries = append(secondaries, Node{ + Storage: server.Name, + Address: server.ListenAddr, + Token: "", + }) + } + m.nodes.reposToSecondaries[relativePath] = secondaries + // initialize replication job queue to replicate all whitelisted repos // to every secondary server - for _, secondary := range cfg.SecondaryServers { + for _, secondary := range m.nodes.reposToSecondaries[relativePath] { m.jobs.next++ m.jobs.records[m.jobs.next] = jobRecord{ state: JobStateReady, - target: secondary.Name, + target: secondary.Storage, relativePath: relativePath, } } @@ -189,32 +183,73 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { return m } +// GetDefaultPrimary gets the default primary from the MemoryDatastore +func (md *MemoryDatastore) GetDefaultPrimary() (Node, error) { + return md.nodes.defaultPrimary, nil +} + +// GetPrimary gets the primary datastore location +func (md *MemoryDatastore) GetPrimary(repo Repository) (Node, error) { + md.nodes.RLock() + defer md.nodes.RUnlock() + + primary, ok := md.nodes.reposToPrimary[repo.RelativePath] + if !ok { + return md.GetDefaultPrimary() + } + + return primary, nil +} + +// SetPrimary sets the primary for the MemoryDatastore +func (md *MemoryDatastore) SetPrimary(repo Repository, node Node) error { + md.nodes.Lock() + defer md.nodes.Unlock() + + md.nodes.reposToPrimary[repo.RelativePath] = node + return nil +} + // GetSecondaries will return the set of secondary storage locations for a // given repository if they exist -func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) { - shard, _ := md.getShard(primary.RelativePath) - - return shard.secondaries, nil +func (md *MemoryDatastore) GetSecondaries(repo Repository) ([]Node, error) { + return md.nodes.reposToSecondaries[repo.RelativePath], nil } -// SetSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error { - md.replicas.Lock() - md.replicas.m[primary.RelativePath] = shard{ - primary: primary.Storage, - secondaries: secondaries, +// RemoveSecondary will return the set of secondary storage locations for a +// given repository if they exist +func (md *MemoryDatastore) RemoveSecondary(repo Repository, node Node) error { + + secondaries, ok := md.nodes.reposToSecondaries[repo.RelativePath] + if !ok { + return nil + } + + for i, secondary := range secondaries { + if secondary == node { + secondaries = append(secondaries[:i], secondaries[i+1:]...) + md.nodes.reposToSecondaries[repo.RelativePath] = secondaries + return nil + } } - md.replicas.Unlock() return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +// AddSecondaries will add a secondary to the list of secondaries for a repository +func (md *MemoryDatastore) AddSecondaries(repo Repository, nodes ...Node) error { + md.nodes.Lock() + defer md.nodes.Unlock() + + secondaries, ok := md.nodes.reposToSecondaries[repo.RelativePath] + if !ok { + secondaries = make([]Node, 0) + } + secondaries = append(secondaries, nodes...) - return replicas, ok + md.nodes.reposToSecondaries[repo.RelativePath] = secondaries + + return nil } // ErrSecondariesMissing indicates the repository does not have any backup @@ -251,8 +286,8 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ // replJobFromRecord constructs a replication job from a record and by cross // referencing the current shard for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { + primary, err := md.GetPrimary(Repository{RelativePath: record.relativePath}) + if err != nil { return ReplJob{}, fmt.Errorf( "unable to find shard for project at relative path %q", record.relativePath, @@ -263,7 +298,7 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re ID: jobID, Source: Repository{ RelativePath: record.relativePath, - Storage: shard.primary, + Storage: primary.Storage, }, State: record.state, Target: record.target, @@ -285,8 +320,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + secondaries, err := md.GetSecondaries(Repository{RelativePath: source.RelativePath}) + if err != nil { + return nil, err + } + + if len(secondaries) == 0 { return nil, fmt.Errorf( "unable to find shard for project at relative path %q", source.RelativePath, @@ -295,12 +334,12 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range secondaries { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - target: secondary, + target: secondary.Storage, state: JobStatePending, relativePath: source.RelativePath, } @@ -331,26 +370,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary string) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.storageName = primary - - return nil -} - -// GetPrimary gets the primary datastore location -func (md *MemoryDatastore) GetPrimary() (string, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - storageName := md.primary.storageName - if storageName == "" { - return "", ErrPrimaryNotSet - } - - return storageName, nil -} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index a306b3ce6..502bead0d 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -13,10 +13,10 @@ import ( // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { cfg := config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", - }, - SecondaryServers: []*config.GitalyServer{ + Servers: []*config.GitalyServer{ + { + Name: "default", + }, { Name: "backup-1", }, @@ -34,16 +34,16 @@ func TestMemoryDatastoreWhitelist(t *testing.T) { repo1 := praefect.Repository{ RelativePath: cfg.Whitelist[0], - Storage: cfg.PrimaryServer.Name, + Storage: cfg.Servers[0].Name, } repo2 := praefect.Repository{ RelativePath: cfg.Whitelist[1], - Storage: cfg.PrimaryServer.Name, + Storage: cfg.Servers[0].Name, } - expectSecondaries := []string{ - cfg.SecondaryServers[0].Name, - cfg.SecondaryServers[1].Name, + expectSecondaries := []praefect.Node{ + praefect.Node{Storage: cfg.Servers[1].Name}, + praefect.Node{Storage: cfg.Servers[2].Name}, } for _, repo := range []praefect.Repository{repo1, repo2} { @@ -52,8 +52,8 @@ func TestMemoryDatastoreWhitelist(t *testing.T) { require.ElementsMatch(t, actualSecondaries, expectSecondaries) } - backup1 := cfg.SecondaryServers[0] - backup2 := cfg.SecondaryServers[1] + backup1 := cfg.Servers[1] + backup2 := cfg.Servers[2] backup1ExpectedJobs := []praefect.ReplJob{ praefect.ReplJob{ diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 6534b9d88..a0e89ceb6 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -43,7 +43,7 @@ var operations = []struct { { desc: "associate the replication job target with a primary", opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetSecondaries(repo1Primary, []string{stor2}) + err := ds.AddSecondaries(repo1Primary, praefect.Node{Storage: stor2}) require.NoError(t, err) }, }, @@ -93,8 +93,10 @@ var flavors = map[string]func() praefect.Datastore{ "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore( config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", + Servers: []*config.GitalyServer{ + { + Name: "default", + }, }, }) }, diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index ac2dcf6f6..ec9eacee4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -28,11 +28,11 @@ import ( func TestReplicatorProcessJobsWhitelist(t *testing.T) { var ( cfg = config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*config.GitalyServer{ + Servers: []*config.GitalyServer{ + { + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, { Name: "backup1", ListenAddr: "tcp://gitaly-backup1.example.com", @@ -51,7 +51,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { coordinator = praefect.NewCoordinator(logrus.New(), datastore) resultsCh = make(chan result) replman = praefect.NewReplMgr( - cfg.SecondaryServers[1].Name, + cfg.Servers[2].Name, logrus.New(), datastore, coordinator, @@ -60,11 +60,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { ) ) - for _, node := range []*config.GitalyServer{ - cfg.PrimaryServer, - cfg.SecondaryServers[0], - cfg.SecondaryServers[1], - } { + for _, node := range cfg.Servers { err := coordinator.RegisterNode(node.Name, node.ListenAddr) require.NoError(t, err) } @@ -85,8 +81,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { result := <-resultsCh assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name) - assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name) + assert.Equal(t, cfg.Servers[2].Name, result.target.Storage) + assert.Equal(t, cfg.Servers[0].Name, result.source.Storage) } cancel() @@ -140,11 +136,11 @@ func TestReplicate(t *testing.T) { defer cleanupFn() var ( cfg = config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*config.GitalyServer{ + Servers: []*config.GitalyServer{ + { + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, { Name: "backup", ListenAddr: "tcp://gitaly-backup1.example.com", @@ -184,7 +180,7 @@ func TestReplicate(t *testing.T) { coordinator.RegisterNode("default", socketPath) replman := praefect.NewReplMgr( - cfg.SecondaryServers[0].Name, + cfg.Servers[1].Name, logrus.New(), datastore, coordinator, diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index b6f2a8fd9..f4120a4ca 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -51,8 +51,10 @@ func TestServerSimpleUnaryUnary(t *testing.T) { ) datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", + Servers: []*config.GitalyServer{ + { + Name: "default", + }, }, }) coordinator := praefect.NewCoordinator(logrus.New(), datastore) |