diff options
author | John Cai <jcai@gitlab.com> | 2019-09-12 01:57:35 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-09-18 21:38:07 +0300 |
commit | 1fecd3adaaf021db721cdfc0fd2d46e96e4f791b (patch) | |
tree | 03793bdac1f585d761f1faa1e4662bc6644debfd | |
parent | d9dc10a48d8c7fb0ada94f7cda67d90071e73b18 (diff) |
Explicitly designate primary and replica nodes in praefect config
-rw-r--r-- | changelogs/unreleased/jc-explicitly-designate-primary.yml | 5 | ||||
-rw-r--r-- | config.praefect.toml.example | 1 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 23 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 19 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 20 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 17 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/models/node.go | 9 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 5 |
11 files changed, 85 insertions, 30 deletions
diff --git a/changelogs/unreleased/jc-explicitly-designate-primary.yml b/changelogs/unreleased/jc-explicitly-designate-primary.yml new file mode 100644 index 000000000..06fa5fa53 --- /dev/null +++ b/changelogs/unreleased/jc-explicitly-designate-primary.yml @@ -0,0 +1,5 @@ +--- +title: Explicitly designate primary and replica nodes in praefect config +merge_request: 1483 +author: +type: other diff --git a/config.praefect.toml.example b/config.praefect.toml.example index bbaaf8f65..368f9fed6 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -24,6 +24,7 @@ listen_addr = "127.0.0.1:2305" [[node]] storage = "praefect-git-0" address = "tcp://praefect-git-0.internal" + primary = true [[node]] storage = "praefect-git-1" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 5499d30f5..633f57d1f 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -40,6 +40,8 @@ var ( errDuplicateStorage = errors.New("internal gitaly storages are not unique") errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage") + errMoreThanOnePrimary = errors.New("only 1 node can be designated as a primary") + errNoPrimaries = errors.New("no primaries designated") ) // Validate establishes if the config is valid @@ -48,12 +50,17 @@ func (c Config) Validate() error { return errNoListener } - if len(c.Nodes) == 0 { - return errNoGitalyServers - } + storages := make(map[string]struct{}) - storages := make(map[string]struct{}, len(c.Nodes)) + var primaries int for _, node := range c.Nodes { + if node.DefaultPrimary { + primaries++ + } + + if primaries > 1 { + return errMoreThanOnePrimary + } if node.Storage == "" { return errGitalyWithoutStorage } @@ -69,5 +76,13 @@ func (c Config) Validate() error { storages[node.Storage] = struct{}{} } + if len(storages) == 0 { + return errNoGitalyServers + } + + if primaries == 0 { + return errNoPrimaries + } + return nil } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 1ed2a39fc..22316f563 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -10,7 +10,7 @@ import ( func TestConfigValidation(t *testing.T) { nodes := []*models.Node{ - {ID: 1, Storage: "internal-1", Address: "localhost:23456", Token: "secret-token"}, + {ID: 1, Storage: "internal-1", Address: "localhost:23456", Token: "secret-token", DefaultPrimary: true}, {ID: 2, Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"}, {ID: 3, Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"}, } @@ -45,6 +45,16 @@ func TestConfigValidation(t *testing.T) { config: Config{ListenAddr: "localhost:1234", Nodes: nodes}, err: nil, }, + { + desc: "No designated primaries", + config: Config{ListenAddr: "localhost:1234", Nodes: nodes[1:]}, + err: errNoPrimaries, + }, + { + desc: "More than 1 primary", + config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{ID: 3, Storage: "internal-4", Address: "localhost:23459", Token: "secret-token", DefaultPrimary: true})}, + err: errMoreThanOnePrimary, + }, } for _, tc := range testCases { @@ -64,9 +74,10 @@ func TestConfigParsing(t *testing.T) { filePath: "testdata/config.toml", expected: Config{ Nodes: []*models.Node{ - { - Address: "tcp://gitaly-internal-1.example.com", - Storage: "praefect-internal-1", + &models.Node{ + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", + DefaultPrimary: true, }, { Address: "tcp://gitaly-internal-2.example.com", diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index d2cb28396..36476c98f 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -11,6 +11,7 @@ prometheus_listen_addr = "" [[node]] address = "tcp://gitaly-internal-1.example.com" storage = "praefect-internal-1" + primary = true [[node]] address = "tcp://gitaly-internal-2.example.com" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0c80bd398..1c9ba2e1a 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "os/signal" - "sort" "sync" "syscall" @@ -167,15 +166,12 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git if len(nodes) == 0 { return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) - } - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].ID < nodes[j].ID - }) - - newPrimary := nodes[0] - replicas := nodes[1:] + newPrimary, err := c.datastore.PickAPrimary() + if err != nil { + return nil, fmt.Errorf("could not choose a primary: %v", err) + } // set the primary if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { @@ -183,13 +179,16 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git } // add replicas - for _, replica := range replicas { + for _, replica := range nodes { + if replica.DefaultPrimary { + continue + } if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.ID); err != nil { return nil, err } } - primary = &newPrimary + return newPrimary, nil } return primary, nil @@ -214,6 +213,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func() if err != nil { return nil, err } + return func() { for _, jobID := range jobIDs { if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e029a3f48..d6e3c519a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -30,9 +30,11 @@ func TestStreamDirector(t *testing.T) { datastore := NewMemoryDatastore(config.Config{ Nodes: []*models.Node{ &models.Node{ - Address: "tcp://gitaly-primary.example.com", - Storage: "praefect-internal-1", - }, &models.Node{ + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + &models.Node{ Address: "tcp://gitaly-backup1.example.com", Storage: "praefect-internal-2", }}, diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index f61a64064..b336bff0c 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -69,6 +69,8 @@ type Datastore interface { // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { + PickAPrimary() (*models.Node, error) + GetReplicas(relativePath string) ([]models.Node, error) GetStorageNode(nodeID int) (models.Node, error) @@ -160,6 +162,21 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { return m } +// PickAPrimary returns the primary configured in the config file +func (md *MemoryDatastore) PickAPrimary() (*models.Node, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + + for _, node := range md.storageNodes.m { + if node.DefaultPrimary { + return &node, nil + } + } + + return nil, errors.New("no default primaries found") + +} + // GetReplicas gets the secondaries for a repository based on the relative path func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) { md.repositories.RLock() diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index d661d8d21..3634aa969 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -10,9 +10,10 @@ import ( var ( stor1 = models.Node{ - ID: 0, - Address: "tcp://address-1", - Storage: "praefect-storage-1", + ID: 0, + Address: "tcp://address-1", + Storage: "praefect-storage-1", + DefaultPrimary: true, } stor2 = models.Node{ ID: 1, diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go index 941a72e8f..23918021f 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/models/node.go @@ -2,10 +2,11 @@ package models // Node describes an address that serves a storage type Node struct { - ID int - Storage string `toml:"storage"` - Address string `toml:"address"` - Token string `toml:"token"` + ID int + Storage string `toml:"storage"` + Address string `toml:"address"` + Token string `toml:"token"` + DefaultPrimary bool `toml:"primary"` } // Repository describes a repository's relative path and its primary and list of secondaries diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index d04f61121..cbc8e2482 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -60,8 +60,9 @@ func TestServerSimpleUnaryUnary(t *testing.T) { datastore := NewMemoryDatastore(config.Config{ Nodes: []*models.Node{ &models.Node{ - ID: 1, - Storage: "praefect-internal-1", + ID: 1, + Storage: "praefect-internal-1", + DefaultPrimary: true, }, &models.Node{ ID: 2, |