diff options
author | John Cai <jcai@gitlab.com> | 2019-11-26 22:34:43 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-11-26 22:34:43 +0300 |
commit | 82d31745e0901dc91c2fe02f44ff8ff659a22c09 (patch) | |
tree | 905df77c5b25e392ff5d0300223668e6e0bedede | |
parent | 39bfe1dfaa4fcd493d53b21b7a172d0cad2ef8ac (diff) | |
parent | 59f4dd3c9a4f7a562849e3bd3d27dce5e0b4551c (diff) |
Merge branch 'jc-praefect-multiple-virtual-storage' into 'master'
Praefect multiple virtual storage
See merge request gitlab-org/gitaly!1606
-rw-r--r-- | changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 15 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 125 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 156 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 26 | ||||
-rw-r--r-- | internal/praefect/config/testdata/single-virtual-storage.config.toml | 25 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 14 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 33 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 95 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 42 | ||||
-rw-r--r-- | internal/praefect/models/node.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 4 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 34 | ||||
-rw-r--r-- | internal/praefect/server.go | 26 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 230 | ||||
-rw-r--r-- | internal/praefect/service/server/info.go | 27 |
18 files changed, 611 insertions, 284 deletions
diff --git a/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml b/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml new file mode 100644 index 000000000..56bd79024 --- /dev/null +++ b/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml @@ -0,0 +1,5 @@ +--- +title: Praefect multiple virtual storage +merge_request: 1606 +author: +type: changed diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 607e650ae..271041e8c 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -97,12 +97,17 @@ func configure() (config.Config, error) { func run(listeners []net.Listener, conf config.Config) error { clientConnections := conn.NewClientConnections() - for _, node := range conf.Nodes { - if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil { - return fmt.Errorf("failed to register %s: %s", node.Address, err) - } + for _, virtualStorage := range conf.VirtualStorages { + for _, node := range virtualStorage.Nodes { + if _, err := clientConnections.GetConnection(node.Storage); err == nil { + continue + } + if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil { + return fmt.Errorf("failed to register %s: %s", node.Address, err) + } - logger.WithField("node_address", node.Address).Info("registered gitaly node") + logger.WithField("node_address", node.Address).Info("registered gitaly node") + } } var ( diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index b426910f8..ff510e460 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -166,13 +166,17 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func backend, cleanup := newMockDownstream(t, backendToken, mockServer) conf := config.Config{ - VirtualStorageName: "praefect", - Auth: auth.Config{Token: token, Transitioning: !required}, - Nodes: []*models.Node{ - &models.Node{ - Storage: "praefect-internal-0", - DefaultPrimary: true, - Address: backend, + Auth: auth.Config{Token: token, Transitioning: !required}, + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + Storage: "praefect-internal-0", + DefaultPrimary: true, + Address: backend, + }, + }, }, }, } diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index e400350fd..b074c1f01 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -2,6 +2,7 @@ package config import ( "errors" + "fmt" "os" "github.com/BurntSushi/toml" @@ -14,16 +15,23 @@ import ( // Config is a container for everything found in the TOML config file type Config struct { - VirtualStorageName string `toml:"virtual_storage_name"` - ListenAddr string `toml:"listen_addr"` - SocketPath string `toml:"socket_path"` + ListenAddr string `toml:"listen_addr"` + SocketPath string `toml:"socket_path"` + VirtualStorages []*VirtualStorage `toml:"virtual_storage"` + //TODO: Remove VirtualStorageName and Nodes once omnibus and gdk are updated with support for + // VirtualStorages + VirtualStorageName string `toml:"virtual_storage_name"` + Nodes []*models.Node `toml:"node"` + Logging log.Config `toml:"logging"` + Sentry sentry.Config `toml:"sentry"` + PrometheusListenAddr string `toml:"prometheus_listen_addr"` + Auth auth.Config `toml:"auth"` +} +// VirtualStorage represents a set of nodes for a storage +type VirtualStorage struct { + Name string `toml:"name"` Nodes []*models.Node `toml:"node"` - - Logging log.Config `toml:"logging"` - Sentry sentry.Config `toml:"sentry"` - PrometheusListenAddr string `toml:"prometheus_listen_addr"` - Auth auth.Config `toml:"auth"` } // FromFile loads the config for the passed file path @@ -36,17 +44,34 @@ func FromFile(filePath string) (Config, error) { defer cfgFile.Close() _, err = toml.DecodeReader(cfgFile, config) + + // TODO: Remove this after the virtual storages change is merged in omnibus + // and gdk. This is for backwards compatibility purposes only + if len(config.VirtualStorages) == 0 && config.VirtualStorageName != "" && len(config.Nodes) > 0 { + config.VirtualStorages = []*VirtualStorage{ + &VirtualStorage{ + Name: config.VirtualStorageName, + Nodes: config.Nodes, + }, + } + config.VirtualStorageName = "" + config.Nodes = nil + } + return *config, err } var ( - errNoListener = errors.New("no listen address or socket path configured") - errNoGitalyServers = errors.New("no primary gitaly backends configured") - errDuplicateStorage = errors.New("internal gitaly storages are not unique") - errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") - errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage") - errMoreThanOnePrimary = errors.New("only 1 node can be designated as a primary") - errNoPrimaries = errors.New("no primaries designated") + errDuplicateStorage = errors.New("internal gitaly storages are not unique") + errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") + errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage") + errMoreThanOnePrimary = errors.New("only 1 node can be designated as a primary") + errNoGitalyServers = errors.New("no primary gitaly backends configured") + errNoListener = errors.New("no listen address or socket path configured") + errNoPrimaries = errors.New("no primaries designated") + errNoVirtualStorages = errors.New("no virtual storages configured") + errStorageAddressMismatch = errors.New("storages with the same name must have the same address") + errVirtualStoragesNotUnique = errors.New("virtual storages must have unique names") ) // Validate establishes if the config is valid @@ -55,38 +80,60 @@ func (c Config) Validate() error { return errNoListener } - storages := make(map[string]struct{}) + if len(c.VirtualStorages) == 0 { + return errNoVirtualStorages + } - var primaries int - for _, node := range c.Nodes { - if node.DefaultPrimary { - primaries++ - } + allStorages := make(map[string]string) + virtualStorages := make(map[string]struct{}) - if primaries > 1 { - return errMoreThanOnePrimary - } - if node.Storage == "" { - return errGitalyWithoutStorage + for _, virtualStorage := range c.VirtualStorages { + if _, ok := virtualStorages[virtualStorage.Name]; ok { + return errVirtualStoragesNotUnique } - if node.Address == "" { - return errGitalyWithoutAddr - } + virtualStorages[virtualStorage.Name] = struct{}{} - if _, found := storages[node.Storage]; found { - return errDuplicateStorage - } + storages := make(map[string]struct{}) + var primaries int + for _, node := range virtualStorage.Nodes { + if node.DefaultPrimary { + primaries++ + } - storages[node.Storage] = struct{}{} - } + if primaries > 1 { + return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errMoreThanOnePrimary) + } - if len(storages) == 0 { - return errNoGitalyServers - } + if node.Storage == "" { + return errGitalyWithoutStorage + } + + if node.Address == "" { + return errGitalyWithoutAddr + } + + if _, found := storages[node.Storage]; found { + return errDuplicateStorage + } - if primaries == 0 { - return errNoPrimaries + if address, found := allStorages[node.Storage]; found { + if address != node.Address { + return errStorageAddressMismatch + } + } else { + allStorages[node.Storage] = node.Address + } + + storages[node.Storage] = struct{}{} + } + + if primaries == 0 { + return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errNoPrimaries) + } + if len(storages) == 0 { + return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errNoGitalyServers) + } } return nil diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index bf707d5d7..ee659c349 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -1,6 +1,7 @@ package config import ( + "strings" "testing" "github.com/stretchr/testify/assert" @@ -12,9 +13,9 @@ import ( func TestConfigValidation(t *testing.T) { nodes := []*models.Node{ - {ID: 1, Storage: "internal-1", Address: "localhost:23456", Token: "secret-token", DefaultPrimary: true}, - {ID: 2, Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"}, - {ID: 3, Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"}, + {Storage: "internal-1", Address: "localhost:23456", Token: "secret-token", DefaultPrimary: true}, + {Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"}, + {Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"}, } testCases := []struct { @@ -24,45 +25,105 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", Nodes: nodes}, + config: Config{ListenAddr: "", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", Nodes: nodes}, + config: Config{SocketPath: "/tmp/praefect.socket", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}}, err: nil, }, { desc: "No servers", config: Config{ListenAddr: "localhost:1234"}, - err: errNoGitalyServers, + err: errNoVirtualStorages, }, { - desc: "duplicate storage", - config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{Storage: nodes[0].Storage, Address: nodes[1].Address})}, - err: errDuplicateStorage, + desc: "duplicate storage", + config: Config{ + ListenAddr: "localhost:1234", + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{Nodes: append(nodes, &models.Node{Storage: nodes[0].Storage, Address: nodes[1].Address})}, + }, + }, + err: errDuplicateStorage, }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", Nodes: nodes}, + config: Config{ListenAddr: "localhost:1234", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}}, err: nil, }, { desc: "No designated primaries", - config: Config{ListenAddr: "localhost:1234", Nodes: nodes[1:]}, + config: Config{ListenAddr: "localhost:1234", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes[1:]}}}, err: errNoPrimaries, }, { - desc: "More than 1 primary", - config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{ID: 3, Storage: "internal-4", Address: "localhost:23459", Token: "secret-token", DefaultPrimary: true})}, - err: errMoreThanOnePrimary, + desc: "More than 1 primary", + config: Config{ + ListenAddr: "localhost:1234", + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{ + Nodes: append(nodes, + &models.Node{ + Storage: "internal-4", + Address: "localhost:23459", + Token: "secret-token", + DefaultPrimary: true, + }), + }, + }, + }, + err: errMoreThanOnePrimary, + }, + { + desc: "Node storage not unique", + config: Config{ + ListenAddr: "localhost:1234", + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{Name: "default", Nodes: nodes}, + &VirtualStorage{ + Name: "backup", + Nodes: []*models.Node{ + &models.Node{ + Storage: nodes[0].Storage, + Address: "some.other.address", + DefaultPrimary: true}, + }, + }, + }, + }, + err: errStorageAddressMismatch, + }, + { + desc: "Node storage not unique", + config: Config{ + ListenAddr: "localhost:1234", + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{Name: "default", Nodes: nodes}, + &VirtualStorage{ + Name: "default", + Nodes: []*models.Node{ + &models.Node{ + Storage: nodes[0].Storage, + Address: "some.other.address", + DefaultPrimary: true}}, + }, + }, + }, + err: errVirtualStoragesNotUnique, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { err := tc.config.Validate() - assert.Equal(t, tc.err, err) + if tc.err == nil { + assert.NoError(t, err) + return + } + + assert.True(t, strings.Contains(err.Error(), tc.err.Error())) }) } } @@ -75,7 +136,6 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - VirtualStorageName: "praefect", Logging: log.Config{ Level: "info", Format: "json", @@ -84,19 +144,59 @@ func TestConfigParsing(t *testing.T) { DSN: "abcd123", Environment: "production", }, - Nodes: []*models.Node{ - &models.Node{ - Address: "tcp://gitaly-internal-1.example.com", - Storage: "praefect-internal-1", - DefaultPrimary: true, - }, - { - Address: "tcp://gitaly-internal-2.example.com", - Storage: "praefect-internal-2", + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + { + Address: "tcp://gitaly-internal-2.example.com", + Storage: "praefect-internal-2", + }, + { + Address: "tcp://gitaly-internal-3.example.com", + Storage: "praefect-internal-3", + }, + }, }, - { - Address: "tcp://gitaly-internal-3.example.com", - Storage: "praefect-internal-3", + }, + }, + }, + //TODO: Remove this test, as well as the fixture in testdata/single-virtual-storage.config.toml + // once omnibus and gdk are updated with support for VirtualStorages + { + filePath: "testdata/single-virtual-storage.config.toml", + expected: Config{ + Logging: log.Config{ + Level: "info", + Format: "json", + }, + Sentry: sentry.Config{ + DSN: "abcd123", + Environment: "production", + }, + VirtualStorages: []*VirtualStorage{ + &VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + { + Address: "tcp://gitaly-internal-2.example.com", + Storage: "praefect-internal-2", + }, + { + Address: "tcp://gitaly-internal-3.example.com", + Storage: "praefect-internal-3", + }, + }, }, }, }, diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 1c85c7e47..bd1958975 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -1,4 +1,3 @@ -virtual_storage_name = "praefect" listen_addr = "" socket_path = "" prometheus_listen_addr = "" @@ -11,15 +10,18 @@ prometheus_listen_addr = "" sentry_environment = "production" sentry_dsn = "abcd123" -[[node]] - address = "tcp://gitaly-internal-1.example.com" - storage = "praefect-internal-1" - primary = true +[[virtual_storage]] +name = "praefect" -[[node]] - address = "tcp://gitaly-internal-2.example.com" - storage = "praefect-internal-2" - -[[node]] - address = "tcp://gitaly-internal-3.example.com" - storage = "praefect-internal-3" + [[virtual_storage.node]] + address = "tcp://gitaly-internal-1.example.com" + storage = "praefect-internal-1" + primary = true + + [[virtual_storage.node]] + address = "tcp://gitaly-internal-2.example.com" + storage = "praefect-internal-2" + + [[virtual_storage.node]] + address = "tcp://gitaly-internal-3.example.com" + storage = "praefect-internal-3" diff --git a/internal/praefect/config/testdata/single-virtual-storage.config.toml b/internal/praefect/config/testdata/single-virtual-storage.config.toml new file mode 100644 index 000000000..e98381875 --- /dev/null +++ b/internal/praefect/config/testdata/single-virtual-storage.config.toml @@ -0,0 +1,25 @@ +listen_addr = "" +socket_path = "" +prometheus_listen_addr = "" +virtual_storage_name = "praefect" + +[logging] + format = "json" + level = "info" + +[sentry] + sentry_environment = "production" + sentry_dsn = "abcd123" + +[[node]] + address = "tcp://gitaly-internal-1.example.com" + storage = "praefect-internal-1" + primary = true + +[[node]] + address = "tcp://gitaly-internal-2.example.com" + storage = "praefect-internal-2" + +[[node]] + address = "tcp://gitaly-internal-3.example.com" + storage = "praefect-internal-3" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index fa322da2b..1292ae7ae 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -22,8 +22,6 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func isDestructive(methodName string) bool { @@ -128,15 +126,12 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo return "", nil, err } - if targetRepo.StorageName != c.conf.VirtualStorageName { - return "", nil, status.Errorf(codes.InvalidArgument, "only messages for %s are allowed", c.conf.VirtualStorageName) - } - primary, err := c.selectPrimary(mi, targetRepo) if err != nil { return "", nil, err } + // rewrite storage name targetRepo.StorageName = primary.Storage additionalRepo, ok, err := mi.AdditionalRepo(m) @@ -159,6 +154,7 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo requestFinalizer := noopRequestFinalizer + // TODO: move the logic of creating replication jobs to the streamDirector method if mi.Operation == protoregistry.OpMutator { change := datastore.UpdateRepo if isDestructive(method) { @@ -193,13 +189,13 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) } - newPrimary, err := c.datastore.PickAPrimary() + newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName()) if err != nil { return nil, fmt.Errorf("could not choose a primary: %v", err) } // set the primary - if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.Storage); err != nil { return nil, err } @@ -208,7 +204,7 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git if replica.DefaultPrimary { continue } - if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.ID); err != nil { + if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.Storage); err != nil { return nil, err } } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 76a0a84d3..bd941f97a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -30,17 +30,21 @@ func TestSecondaryRotation(t *testing.T) { func TestStreamDirector(t *testing.T) { conf := config.Config{ - VirtualStorageName: "praefect", - Nodes: []*models.Node{ - &models.Node{ - Address: "tcp://gitaly-primary.example.com", - Storage: "praefect-internal-1", - DefaultPrimary: true, + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + &models.Node{ + Address: "tcp://gitaly-backup1.example.com", + Storage: "praefect-internal-2", + }}, }, - &models.Node{ - Address: "tcp://gitaly-backup1.example.com", - Storage: "praefect-internal-2", - }}, + }, } ds := datastore.NewInMemory(conf) @@ -87,13 +91,14 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name") - jobs, err := ds.GetJobs(datastore.JobStatePending, 1, 10) + jobs, err := ds.GetJobs(datastore.JobStatePending, "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, jobs, 1) - targetNode, err := ds.GetStorageNode(1) + targetNode, err := ds.GetStorageNode("praefect-internal-2") require.NoError(t, err) - sourceNode, err := ds.GetStorageNode(0) + sourceNode, err := ds.GetStorageNode("praefect-internal-1") + require.NoError(t, err) expectedJob := datastore.ReplJob{ @@ -109,7 +114,7 @@ func TestStreamDirector(t *testing.T) { jobUpdateFunc() - jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10) + jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 8bb18cd07..378adc692 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -80,21 +80,21 @@ type Datastore interface { // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - PickAPrimary() (*models.Node, error) + PickAPrimary(virtualStorage string) (*models.Node, error) GetReplicas(relativePath string) ([]models.Node, error) - GetStorageNode(nodeID int) (models.Node, error) + GetStorageNode(nodeStorage string) (models.Node, error) GetStorageNodes() ([]models.Node, error) GetPrimary(relativePath string) (*models.Node, error) - SetPrimary(relativePath string, storageNodeID int) error + SetPrimary(relativePath, nodeStorage string) error - AddReplica(relativePath string, storageNodeID int) error + AddReplica(relativePath string, nodeStorage string) error - RemoveReplica(relativePath string, storageNodeID int) error + RemoveReplica(relativePath, nodeStorage string) error GetRepository(relativePath string) (*models.Repository, error) } @@ -105,7 +105,7 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) + GetJobs(flag JobState, nodeStorage string, count int) ([]ReplJob, error) // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job @@ -117,10 +117,10 @@ type ReplJobsDatastore interface { } type jobRecord struct { - change ChangeType - relativePath string // project's relative path - targetNodeID, sourceNodeID int - state JobState + change ChangeType + relativePath string // project's relative path + targetNodeStorage, sourceNodeStorage string + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is @@ -134,13 +134,18 @@ type MemoryDatastore struct { storageNodes *struct { sync.RWMutex - m map[int]models.Node + m map[string]models.Node } repositories *struct { sync.RWMutex m map[string]models.Repository } + + virtualStorages *struct { + sync.RWMutex + m map[string][]*models.Node + } } // NewInMemory returns an initialized in-memory datastore @@ -148,9 +153,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ storageNodes: &struct { sync.RWMutex - m map[int]models.Node + m map[string]models.Node }{ - m: map[int]models.Node{}, + m: map[string]models.Node{}, }, jobs: &struct { sync.RWMutex @@ -164,24 +169,36 @@ func NewInMemory(cfg config.Config) *MemoryDatastore { }{ m: map[string]models.Repository{}, }, + virtualStorages: &struct { + sync.RWMutex + m map[string][]*models.Node + }{ + m: map[string][]*models.Node{}, + }, } - for i, storageNode := range cfg.Nodes { - storageNode.ID = i - m.storageNodes.m[i] = *storageNode + for _, virtualStorage := range cfg.VirtualStorages { + m.virtualStorages.m[virtualStorage.Name] = virtualStorage.Nodes + + for _, node := range virtualStorage.Nodes { + if _, ok := m.storageNodes.m[node.Storage]; ok { + continue + } + m.storageNodes.m[node.Storage] = *node + } } return m } // PickAPrimary returns the primary configured in the config file -func (md *MemoryDatastore) PickAPrimary() (*models.Node, error) { +func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (*models.Node, error) { md.storageNodes.RLock() defer md.storageNodes.RUnlock() - for _, node := range md.storageNodes.m { + for _, node := range md.virtualStorages.m[virtualStorage] { if node.DefaultPrimary { - return &node, nil + return node, nil } } @@ -204,11 +221,11 @@ func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, erro } // GetStorageNode gets all storage nodes -func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.Node, error) { +func (md *MemoryDatastore) GetStorageNode(nodeStorage string) (models.Node, error) { md.storageNodes.RLock() defer md.storageNodes.RUnlock() - node, ok := md.storageNodes.m[nodeID] + node, ok := md.storageNodes.m[nodeStorage] if !ok { return models.Node{}, errors.New("node not found") } @@ -239,15 +256,11 @@ func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) return nil, ErrPrimaryNotSet } - storageNode, ok := md.storageNodes.m[repository.Primary.ID] - if !ok { - return nil, errors.New("node storage not found") - } - return &storageNode, nil + return &repository.Primary, nil } // SetPrimary sets the primary storagee node for a repository of a repository relative path -func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { +func (md *MemoryDatastore) SetPrimary(relativePath, nodeStorage string) error { md.repositories.Lock() defer md.repositories.Unlock() @@ -256,7 +269,7 @@ func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) er repository = models.Repository{RelativePath: relativePath} } - storageNode, ok := md.storageNodes.m[storageNodeID] + storageNode, ok := md.storageNodes.m[nodeStorage] if !ok { return errors.New("node storage not found") } @@ -268,7 +281,7 @@ func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) er } // AddReplica adds a secondary to a repository of a repository relative path -func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { +func (md *MemoryDatastore) AddReplica(relativePath, nodeStorage string) error { md.repositories.Lock() defer md.repositories.Unlock() @@ -277,7 +290,7 @@ func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) er return errors.New("repository not found") } - storageNode, ok := md.storageNodes.m[storageNodeID] + storageNode, ok := md.storageNodes.m[nodeStorage] if !ok { return errors.New("node storage not found") } @@ -289,7 +302,7 @@ func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) er } // RemoveReplica removes a secondary from a repository of a repository relative path -func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { +func (md *MemoryDatastore) RemoveReplica(relativePath, nodeStorage string) error { md.repositories.Lock() defer md.repositories.Unlock() @@ -300,7 +313,7 @@ func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) var secondaries []models.Node for _, secondary := range repository.Replicas { - if secondary.ID != storageNodeID { + if secondary.Storage != nodeStorage { secondaries = append(secondaries, secondary) } } @@ -328,7 +341,7 @@ func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repositor var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -336,7 +349,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNodeID == targetNodeID { + if record.state&state != 0 && record.targetNodeStorage == targetNodeStorage { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -362,11 +375,11 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re return ReplJob{}, err } - sourceNode, err := md.GetStorageNode(record.sourceNodeID) + sourceNode, err := md.GetStorageNode(record.sourceNodeStorage) if err != nil { return ReplJob{}, err } - targetNode, err := md.GetStorageNode(record.targetNodeID) + targetNode, err := md.GetStorageNode(record.targetNodeStorage) if err != nil { return ReplJob{}, err } @@ -409,11 +422,11 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change Cha nextID := uint64(len(md.jobs.records) + 1) md.jobs.records[nextID] = jobRecord{ - change: change, - targetNodeID: secondary.ID, - state: JobStatePending, - relativePath: relativePath, - sourceNodeID: repository.Primary.ID, + change: change, + targetNodeStorage: secondary.Storage, + state: JobStatePending, + relativePath: relativePath, + sourceNodeStorage: repository.Primary.Storage, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 1be1e1de0..c5d8b398e 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -10,13 +10,11 @@ import ( var ( stor1 = models.Node{ - ID: 0, Address: "tcp://address-1", Storage: "praefect-storage-1", DefaultPrimary: true, } stor2 = models.Node{ - ID: 1, Address: "tcp://address-2", Storage: "praefect-storage-2", } @@ -36,7 +34,7 @@ var operations = []struct { { desc: "query an empty datastore", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -51,14 +49,14 @@ var operations = []struct { { desc: "set the primary for the repository", opFn: func(t *testing.T, ds Datastore) { - err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID) + err := ds.SetPrimary(repo1Repository.RelativePath, stor1.Storage) require.NoError(t, err) }, }, { desc: "add a secondary replica for the repository", opFn: func(t *testing.T, ds Datastore) { - err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID) + err := ds.AddReplica(repo1Repository.RelativePath, stor2.Storage) require.NoError(t, err) }, }, @@ -73,7 +71,7 @@ var operations = []struct { { desc: "fetch inserted replication jobs after primary mapped", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10) + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.Storage, 10) require.NoError(t, err) require.Len(t, jobs, 1) @@ -102,7 +100,7 @@ var operations = []struct { { desc: "try fetching completed replication job", opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -113,7 +111,11 @@ var operations = []struct { var flavors = map[string]func() Datastore{ "in-memory-datastore": func() Datastore { return NewInMemory(config.Config{ - Nodes: []*models.Node{&stor1, &stor2}, + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Nodes: []*models.Node{&stor1, &stor2}, + }, + }, }) }, } diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index f1f8778b5..4d11354bb 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -41,15 +41,10 @@ func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) { // generates a praefect configuration with the specified number of backend // nodes func testConfig(backends int) config.Config { - cfg := config.Config{ - VirtualStorageName: "praefect", - } - var nodes []*models.Node for i := 0; i < backends; i++ { n := &models.Node{ - ID: i, Storage: fmt.Sprintf("praefect-internal-%d", i), Token: fmt.Sprintf("%d", i), } @@ -60,8 +55,14 @@ func testConfig(backends int) config.Config { nodes = append(nodes, n) } - - cfg.Nodes = nodes + cfg := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: nodes, + }, + }, + } return cfg } @@ -75,7 +76,7 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti ) var defaultNode *models.Node - for _, n := range conf.Nodes { + for _, n := range conf.VirtualStorages[0].Nodes { if n.DefaultPrimary { defaultNode = n } @@ -104,27 +105,30 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti // Each mock server is keyed by the corresponding index of the node in the // config.Nodes. There must be a 1-to-1 mapping between backend server and // configured storage node. -func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { +// requires there to be only 1 virtual storage +func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { clientCC := conn.NewClientConnections() + + require.Len(t, conf.VirtualStorages, 1) + require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes), + "mock server count doesn't match config nodes") + var cleanups []testhelper.Cleanup - for i, node := range conf.Nodes { - backend, ok := backends[i] - require.True(t, ok, "missing backend server for node %d", i) + for i, node := range conf.VirtualStorages[0].Nodes { + backend, ok := backends[node.Storage] + require.True(t, ok, "missing backend server for node %s", node.Storage) backendAddr, cleanup := newMockDownstream(t, node.Token, backend) cleanups = append(cleanups, cleanup) clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr - conf.Nodes[i] = node + conf.VirtualStorages[0].Nodes[i] = node } _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) - require.Equal(t, len(backends), len(conf.Nodes), - "mock server count doesn't match config nodes") - listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) @@ -154,17 +158,19 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[in } // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes +// requires exactly 1 virtual storage func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { + require.Len(t, conf.VirtualStorages, 1) clientCC := conn.NewClientConnections() var cleanups []testhelper.Cleanup - for i, node := range conf.Nodes { + for i, node := range conf.VirtualStorages[0].Nodes { _, backendAddr, cleanup := runInternalGitalyServer(t, node.Token) cleanups = append(cleanups, cleanup) clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr - conf.Nodes[i] = node + conf.VirtualStorages[0].Nodes[i] = node } ds := datastore.NewInMemory(conf) diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go index 23918021f..5dfaf3667 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/models/node.go @@ -2,7 +2,6 @@ package models // Node describes an address that serves a storage type Node struct { - ID int Storage string `toml:"storage"` Address string `toml:"address"` Token string `toml:"token"` @@ -11,7 +10,6 @@ type Node struct { // Repository describes a repository's relative path and its primary and list of secondaries type Repository struct { - ID int RelativePath string Primary Node Replicas []Node diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index e12aab139..8ff54b259 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -255,14 +255,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { } for _, node := range nodes { - jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.ID, 10) + jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10) if err != nil { return err } if len(jobs) == 0 { r.log.WithFields(logrus.Fields{ - "node_id": node.ID, + "node_storage": node.Storage, "recheck_interval": jobFetchInterval, }).Trace("no jobs") diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 39623c174..a21011c48 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -71,32 +71,34 @@ func TestProceessReplicationJob(t *testing.T) { ) config := config.Config{ - Nodes: []*models.Node{ - &models.Node{ - ID: 0, - Storage: "default", - Address: srvSocketPath, - Token: gitaly_config.Config.Auth.Token, - DefaultPrimary: true, - }, - &models.Node{ - ID: 1, - Storage: backupStorageName, - Address: srvSocketPath, - Token: gitaly_config.Config.Auth.Token, + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Nodes: []*models.Node{ + &models.Node{ + Storage: "default", + Address: srvSocketPath, + Token: gitaly_config.Config.Auth.Token, + DefaultPrimary: true, + }, + &models.Node{ + Storage: backupStorageName, + Address: srvSocketPath, + Token: gitaly_config.Config.Auth.Token, + }, + }, }, }, } ds := datastore.NewInMemory(config) - ds.SetPrimary(testRepo.GetRelativePath(), 0) - ds.AddReplica(testRepo.GetRelativePath(), 1) + ds.SetPrimary(testRepo.GetRelativePath(), "default") + ds.AddReplica(testRepo.GetRelativePath(), backupStorageName) _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo) require.NoError(t, err) - jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, 1, 1) + jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ada0d05f6..46ee62cea 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -37,18 +37,22 @@ type Server struct { } func (srv *Server) warnDupeAddrs(c config.Config) { - addrSet := map[string]struct{}{} - fishy := false - for _, n := range c.Nodes { - _, ok := addrSet[n.Address] - if ok { - srv.l.Warnf("more than one backend node is hosted at %s", n.Address) - fishy = true + var fishy bool + + for _, virtualStorage := range c.VirtualStorages { + addrSet := map[string]struct{}{} + for _, n := range virtualStorage.Nodes { + _, ok := addrSet[n.Address] + if ok { + srv.l.Warnf("more than one backend node is hosted at %s", n.Address) + fishy = true + continue + } + addrSet[n.Address] = struct{}{} + } + if fishy { + srv.l.Warnf("your Praefect configuration may not offer actual redundancy") } - addrSet[n.Address] = struct{}{} - } - if fishy { - srv.l.Warnf("your Praefect configuration may not offer actual redundancy") } } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index b75aa3c59..b0ecdb7de 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -2,7 +2,6 @@ package praefect import ( "context" - "fmt" "io/ioutil" "os" "strings" @@ -23,9 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" ) func TestServerRouteServerAccessor(t *testing.T) { @@ -38,8 +35,8 @@ func TestServerRouteServerAccessor(t *testing.T) { // note: a server scoped RPC will be randomly routed // to an available backend server. To simplify our // test, a single backend server is used. - backends = map[int]mock.SimpleServiceServer{ - 0: &mockSvc{ + backends = map[string]mock.SimpleServiceServer{ + conf.VirtualStorages[0].Nodes[0].Storage: &mockSvc{ serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { reqQ <- req return expectResp, nil @@ -75,18 +72,20 @@ func TestServerRouteServerAccessor(t *testing.T) { func TestGitalyServerInfo(t *testing.T) { conf := config.Config{ - Nodes: []*models.Node{ - &models.Node{ - ID: 1, - Storage: "praefect-internal-1", - DefaultPrimary: true, - Token: "abc", + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Nodes: []*models.Node{ + &models.Node{ + Storage: "praefect-internal-1", + DefaultPrimary: true, + Token: "abc", + }, + &models.Node{ + Storage: "praefect-internal-2", + Token: "xyz", + }}, }, - &models.Node{ - ID: 2, - Storage: "praefect-internal-2", - Token: "xyz", - }}, + }, } cc, _, cleanup := runPraefectServerWithGitaly(t, conf) defer cleanup() @@ -98,7 +97,7 @@ func TestGitalyServerInfo(t *testing.T) { metadata, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) require.NoError(t, err) - require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes)) + require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes)) require.Equal(t, version.GetVersion(), metadata.GetServerVersion()) gitVersion, err := git.Version() @@ -112,18 +111,20 @@ func TestGitalyServerInfo(t *testing.T) { func TestGitalyDiskStatistics(t *testing.T) { conf := config.Config{ - Nodes: []*models.Node{ - { - ID: 1, - Storage: "praefect-internal-1", - DefaultPrimary: true, - Token: "abc", + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Nodes: []*models.Node{ + { + Storage: "praefect-internal-1", + DefaultPrimary: true, + Token: "abc", + }, + { + Storage: "praefect-internal-2", + Token: "xyz", + }}, }, - { - ID: 2, - Storage: "praefect-internal-2", - Token: "xyz", - }}, + }, } cc, _, cleanup := runPraefectServerWithGitaly(t, conf) defer cleanup() @@ -156,12 +157,16 @@ func TestHealthCheck(t *testing.T) { func TestRejectBadStorage(t *testing.T) { conf := config.Config{ - VirtualStorageName: "praefect", - Nodes: []*models.Node{ - &models.Node{ - DefaultPrimary: true, - Storage: "praefect-internal-0", - Address: "tcp::/this-doesnt-matter", + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp::/this-doesnt-matter", + }, + }, }, }, } @@ -180,22 +185,39 @@ func TestRejectBadStorage(t *testing.T) { defer cancel() _, err := repoClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: &badTargetRepo}) - testhelper.RequireGrpcError(t, err, codes.InvalidArgument) - require.Equal(t, fmt.Sprintf("only messages for %s are allowed", conf.VirtualStorageName), status.Convert(err).Message()) + require.Error(t, err) } func TestWarnDuplicateAddrs(t *testing.T) { conf := config.Config{ - VirtualStorageName: "praefect", - Nodes: []*models.Node{ - &models.Node{ - DefaultPrimary: true, - Storage: "praefect-internal-0", - Address: "tcp::/samesies", + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "default", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp://abc", + }, + &models.Node{ + Storage: "praefect-internal-1", + Address: "tcp://xyz", + }, + }, }, - &models.Node{ - Storage: "praefect-internal-1", - Address: "tcp::/samesies", + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp://abc", + }, + &models.Node{ + Storage: "praefect-internal-1", + Address: "tcp://xyz", + }, + }, }, }, } @@ -205,31 +227,103 @@ func TestWarnDuplicateAddrs(t *testing.T) { setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning for _, entry := range hook.Entries { + require.NotContains(t, entry.Message, "more than one backend node") + } + + conf = config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp::/samesies", + }, + &models.Node{ + Storage: "praefect-internal-1", + Address: "tcp::/samesies", + }, + }, + }, + }, + } + + tLogger, hook = test.NewNullLogger() + + setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + + var found bool + for _, entry := range hook.Entries { if strings.Contains(entry.Message, "more than one backend node") { - return // pass! + found = true + break } } - t.Fatal("could not find expected log message") + require.True(t, found, "expected to find error log") + + conf = config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "default", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp://abc", + }, + &models.Node{ + Storage: "praefect-internal-1", + Address: "tcp://xyz", + }, + }, + }, + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp://abc", + }, + &models.Node{ + Storage: "praefect-internal-2", + Address: "tcp://xyz", + }, + }, + }, + }, + } + + tLogger, hook = test.NewNullLogger() + + setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + + for _, entry := range hook.Entries { + require.NotContains(t, entry.Message, "more than one backend node") + } } func TestRepoRemoval(t *testing.T) { conf := config.Config{ - VirtualStorageName: "praefect", - Nodes: []*models.Node{ - &models.Node{ - DefaultPrimary: true, - Storage: gconfig.Config.Storages[0].Name, - Address: "tcp::/samesies", - }, - &models.Node{ - ID: 1, - Storage: "praefect-internal-1", - Address: "tcp::/this-doesnt-matter", - }, - &models.Node{ - ID: 2, - Storage: "praefect-internal-2", - Address: "tcp::/this-doesnt-matter", + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: gconfig.Config.Storages[0].Name, + Address: "tcp::/samesies", + }, + &models.Node{ + Storage: "praefect-internal-1", + Address: "tcp::/this-doesnt-matter", + }, + &models.Node{ + Storage: "praefect-internal-2", + Address: "tcp::/this-doesnt-matter", + }, + }, }, }, } @@ -239,11 +333,11 @@ func TestRepoRemoval(t *testing.T) { testStorages := []gconfig.Storage{ { - Name: conf.Nodes[1].Storage, + Name: conf.VirtualStorages[0].Nodes[1].Storage, Path: tempStoragePath(t), }, { - Name: conf.Nodes[2].Storage, + Name: conf.VirtualStorages[0].Nodes[2].Storage, Path: tempStoragePath(t), }, } @@ -257,9 +351,9 @@ func TestRepoRemoval(t *testing.T) { tRepo, _, tCleanup := testhelper.NewTestRepo(t) defer tCleanup() - _, path1, cleanup1 := cloneRepoAtStorage(t, tRepo, conf.Nodes[1].Storage) + _, path1, cleanup1 := cloneRepoAtStorage(t, tRepo, conf.VirtualStorages[0].Nodes[1].Storage) defer cleanup1() - _, path2, cleanup2 := cloneRepoAtStorage(t, tRepo, conf.Nodes[2].Storage) + _, path2, cleanup2 := cloneRepoAtStorage(t, tRepo, conf.VirtualStorages[0].Nodes[2].Storage) defer cleanup2() // prerequisite: repos should exist at expected paths @@ -273,7 +367,7 @@ func TestRepoRemoval(t *testing.T) { defer cancel() virtualRepo := *tRepo - virtualRepo.StorageName = conf.VirtualStorageName + virtualRepo.StorageName = conf.VirtualStorages[0].Name rClient := gitalypb.NewRepositoryServiceClient(cc) diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go index 76c2fd67b..e83394b04 100644 --- a/internal/praefect/service/server/info.go +++ b/internal/praefect/service/server/info.go @@ -3,24 +3,41 @@ package server import ( "context" "fmt" + "sync" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "golang.org/x/sync/errgroup" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // ServerInfo sends ServerInfoRequest to all of a praefect server's internal gitaly nodes and aggregates the results into // a response func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { - storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(s.conf.Nodes)) + var once sync.Once + nodesChecked := make(map[string]struct{}) + + var nodes []*models.Node + for _, virtualStorage := range s.conf.VirtualStorages { + for _, node := range virtualStorage.Nodes { + if _, ok := nodesChecked[node.Storage]; ok { + continue + } + + nodesChecked[node.Storage] = struct{}{} + nodes = append(nodes, node) + } + } var gitVersion, serverVersion string g, ctx := errgroup.WithContext(ctx) - for i, node := range s.conf.Nodes { - i := i // necessary since it will be used in a goroutine below + storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(nodes)) + + for i, node := range nodes { + i := i node := node cc, err := s.clientCC.GetConnection(node.Storage) if err != nil { @@ -36,7 +53,9 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) storageStatuses[i] = resp.GetStorageStatuses() if node.DefaultPrimary { - gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() + once.Do(func() { + gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() + }) } return nil |