Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-11-26 22:34:43 +0300
committerJohn Cai <jcai@gitlab.com>2019-11-26 22:34:43 +0300
commit82d31745e0901dc91c2fe02f44ff8ff659a22c09 (patch)
tree905df77c5b25e392ff5d0300223668e6e0bedede
parent39bfe1dfaa4fcd493d53b21b7a172d0cad2ef8ac (diff)
parent59f4dd3c9a4f7a562849e3bd3d27dce5e0b4551c (diff)
Merge branch 'jc-praefect-multiple-virtual-storage' into 'master'
Praefect multiple virtual storage See merge request gitlab-org/gitaly!1606
-rw-r--r--changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml5
-rw-r--r--cmd/praefect/main.go15
-rw-r--r--internal/praefect/auth_test.go18
-rw-r--r--internal/praefect/config/config.go125
-rw-r--r--internal/praefect/config/config_test.go156
-rw-r--r--internal/praefect/config/testdata/config.toml26
-rw-r--r--internal/praefect/config/testdata/single-virtual-storage.config.toml25
-rw-r--r--internal/praefect/coordinator.go14
-rw-r--r--internal/praefect/coordinator_test.go33
-rw-r--r--internal/praefect/datastore/datastore.go95
-rw-r--r--internal/praefect/datastore/datastore_test.go18
-rw-r--r--internal/praefect/helper_test.go42
-rw-r--r--internal/praefect/models/node.go2
-rw-r--r--internal/praefect/replicator.go4
-rw-r--r--internal/praefect/replicator_test.go34
-rw-r--r--internal/praefect/server.go26
-rw-r--r--internal/praefect/server_test.go230
-rw-r--r--internal/praefect/service/server/info.go27
18 files changed, 611 insertions, 284 deletions
diff --git a/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml b/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml
new file mode 100644
index 000000000..56bd79024
--- /dev/null
+++ b/changelogs/unreleased/jc-praefect-multiple-virtual-storage.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect multiple virtual storage
+merge_request: 1606
+author:
+type: changed
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 607e650ae..271041e8c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -97,12 +97,17 @@ func configure() (config.Config, error) {
func run(listeners []net.Listener, conf config.Config) error {
clientConnections := conn.NewClientConnections()
- for _, node := range conf.Nodes {
- if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil {
- return fmt.Errorf("failed to register %s: %s", node.Address, err)
- }
+ for _, virtualStorage := range conf.VirtualStorages {
+ for _, node := range virtualStorage.Nodes {
+ if _, err := clientConnections.GetConnection(node.Storage); err == nil {
+ continue
+ }
+ if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil {
+ return fmt.Errorf("failed to register %s: %s", node.Address, err)
+ }
- logger.WithField("node_address", node.Address).Info("registered gitaly node")
+ logger.WithField("node_address", node.Address).Info("registered gitaly node")
+ }
}
var (
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index b426910f8..ff510e460 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -166,13 +166,17 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
backend, cleanup := newMockDownstream(t, backendToken, mockServer)
conf := config.Config{
- VirtualStorageName: "praefect",
- Auth: auth.Config{Token: token, Transitioning: !required},
- Nodes: []*models.Node{
- &models.Node{
- Storage: "praefect-internal-0",
- DefaultPrimary: true,
- Address: backend,
+ Auth: auth.Config{Token: token, Transitioning: !required},
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ Storage: "praefect-internal-0",
+ DefaultPrimary: true,
+ Address: backend,
+ },
+ },
},
},
}
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index e400350fd..b074c1f01 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -2,6 +2,7 @@ package config
import (
"errors"
+ "fmt"
"os"
"github.com/BurntSushi/toml"
@@ -14,16 +15,23 @@ import (
// Config is a container for everything found in the TOML config file
type Config struct {
- VirtualStorageName string `toml:"virtual_storage_name"`
- ListenAddr string `toml:"listen_addr"`
- SocketPath string `toml:"socket_path"`
+ ListenAddr string `toml:"listen_addr"`
+ SocketPath string `toml:"socket_path"`
+ VirtualStorages []*VirtualStorage `toml:"virtual_storage"`
+ //TODO: Remove VirtualStorageName and Nodes once omnibus and gdk are updated with support for
+ // VirtualStorages
+ VirtualStorageName string `toml:"virtual_storage_name"`
+ Nodes []*models.Node `toml:"node"`
+ Logging log.Config `toml:"logging"`
+ Sentry sentry.Config `toml:"sentry"`
+ PrometheusListenAddr string `toml:"prometheus_listen_addr"`
+ Auth auth.Config `toml:"auth"`
+}
+// VirtualStorage represents a set of nodes for a storage
+type VirtualStorage struct {
+ Name string `toml:"name"`
Nodes []*models.Node `toml:"node"`
-
- Logging log.Config `toml:"logging"`
- Sentry sentry.Config `toml:"sentry"`
- PrometheusListenAddr string `toml:"prometheus_listen_addr"`
- Auth auth.Config `toml:"auth"`
}
// FromFile loads the config for the passed file path
@@ -36,17 +44,34 @@ func FromFile(filePath string) (Config, error) {
defer cfgFile.Close()
_, err = toml.DecodeReader(cfgFile, config)
+
+ // TODO: Remove this after the virtual storages change is merged in omnibus
+ // and gdk. This is for backwards compatibility purposes only
+ if len(config.VirtualStorages) == 0 && config.VirtualStorageName != "" && len(config.Nodes) > 0 {
+ config.VirtualStorages = []*VirtualStorage{
+ &VirtualStorage{
+ Name: config.VirtualStorageName,
+ Nodes: config.Nodes,
+ },
+ }
+ config.VirtualStorageName = ""
+ config.Nodes = nil
+ }
+
return *config, err
}
var (
- errNoListener = errors.New("no listen address or socket path configured")
- errNoGitalyServers = errors.New("no primary gitaly backends configured")
- errDuplicateStorage = errors.New("internal gitaly storages are not unique")
- errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
- errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage")
- errMoreThanOnePrimary = errors.New("only 1 node can be designated as a primary")
- errNoPrimaries = errors.New("no primaries designated")
+ errDuplicateStorage = errors.New("internal gitaly storages are not unique")
+ errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
+ errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage")
+ errMoreThanOnePrimary = errors.New("only 1 node can be designated as a primary")
+ errNoGitalyServers = errors.New("no primary gitaly backends configured")
+ errNoListener = errors.New("no listen address or socket path configured")
+ errNoPrimaries = errors.New("no primaries designated")
+ errNoVirtualStorages = errors.New("no virtual storages configured")
+ errStorageAddressMismatch = errors.New("storages with the same name must have the same address")
+ errVirtualStoragesNotUnique = errors.New("virtual storages must have unique names")
)
// Validate establishes if the config is valid
@@ -55,38 +80,60 @@ func (c Config) Validate() error {
return errNoListener
}
- storages := make(map[string]struct{})
+ if len(c.VirtualStorages) == 0 {
+ return errNoVirtualStorages
+ }
- var primaries int
- for _, node := range c.Nodes {
- if node.DefaultPrimary {
- primaries++
- }
+ allStorages := make(map[string]string)
+ virtualStorages := make(map[string]struct{})
- if primaries > 1 {
- return errMoreThanOnePrimary
- }
- if node.Storage == "" {
- return errGitalyWithoutStorage
+ for _, virtualStorage := range c.VirtualStorages {
+ if _, ok := virtualStorages[virtualStorage.Name]; ok {
+ return errVirtualStoragesNotUnique
}
- if node.Address == "" {
- return errGitalyWithoutAddr
- }
+ virtualStorages[virtualStorage.Name] = struct{}{}
- if _, found := storages[node.Storage]; found {
- return errDuplicateStorage
- }
+ storages := make(map[string]struct{})
+ var primaries int
+ for _, node := range virtualStorage.Nodes {
+ if node.DefaultPrimary {
+ primaries++
+ }
- storages[node.Storage] = struct{}{}
- }
+ if primaries > 1 {
+ return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errMoreThanOnePrimary)
+ }
- if len(storages) == 0 {
- return errNoGitalyServers
- }
+ if node.Storage == "" {
+ return errGitalyWithoutStorage
+ }
+
+ if node.Address == "" {
+ return errGitalyWithoutAddr
+ }
+
+ if _, found := storages[node.Storage]; found {
+ return errDuplicateStorage
+ }
- if primaries == 0 {
- return errNoPrimaries
+ if address, found := allStorages[node.Storage]; found {
+ if address != node.Address {
+ return errStorageAddressMismatch
+ }
+ } else {
+ allStorages[node.Storage] = node.Address
+ }
+
+ storages[node.Storage] = struct{}{}
+ }
+
+ if primaries == 0 {
+ return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errNoPrimaries)
+ }
+ if len(storages) == 0 {
+ return fmt.Errorf("virtual storage %s: %v", virtualStorage.Name, errNoGitalyServers)
+ }
}
return nil
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index bf707d5d7..ee659c349 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -1,6 +1,7 @@
package config
import (
+ "strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -12,9 +13,9 @@ import (
func TestConfigValidation(t *testing.T) {
nodes := []*models.Node{
- {ID: 1, Storage: "internal-1", Address: "localhost:23456", Token: "secret-token", DefaultPrimary: true},
- {ID: 2, Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"},
- {ID: 3, Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"},
+ {Storage: "internal-1", Address: "localhost:23456", Token: "secret-token", DefaultPrimary: true},
+ {Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"},
+ {Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"},
}
testCases := []struct {
@@ -24,45 +25,105 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", Nodes: nodes},
+ config: Config{ListenAddr: "", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", Nodes: nodes},
+ config: Config{SocketPath: "/tmp/praefect.socket", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}},
err: nil,
},
{
desc: "No servers",
config: Config{ListenAddr: "localhost:1234"},
- err: errNoGitalyServers,
+ err: errNoVirtualStorages,
},
{
- desc: "duplicate storage",
- config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{Storage: nodes[0].Storage, Address: nodes[1].Address})},
- err: errDuplicateStorage,
+ desc: "duplicate storage",
+ config: Config{
+ ListenAddr: "localhost:1234",
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{Nodes: append(nodes, &models.Node{Storage: nodes[0].Storage, Address: nodes[1].Address})},
+ },
+ },
+ err: errDuplicateStorage,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", Nodes: nodes},
+ config: Config{ListenAddr: "localhost:1234", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes}}},
err: nil,
},
{
desc: "No designated primaries",
- config: Config{ListenAddr: "localhost:1234", Nodes: nodes[1:]},
+ config: Config{ListenAddr: "localhost:1234", VirtualStorages: []*VirtualStorage{&VirtualStorage{Nodes: nodes[1:]}}},
err: errNoPrimaries,
},
{
- desc: "More than 1 primary",
- config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{ID: 3, Storage: "internal-4", Address: "localhost:23459", Token: "secret-token", DefaultPrimary: true})},
- err: errMoreThanOnePrimary,
+ desc: "More than 1 primary",
+ config: Config{
+ ListenAddr: "localhost:1234",
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{
+ Nodes: append(nodes,
+ &models.Node{
+ Storage: "internal-4",
+ Address: "localhost:23459",
+ Token: "secret-token",
+ DefaultPrimary: true,
+ }),
+ },
+ },
+ },
+ err: errMoreThanOnePrimary,
+ },
+ {
+ desc: "Node storage not unique",
+ config: Config{
+ ListenAddr: "localhost:1234",
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{Name: "default", Nodes: nodes},
+ &VirtualStorage{
+ Name: "backup",
+ Nodes: []*models.Node{
+ &models.Node{
+ Storage: nodes[0].Storage,
+ Address: "some.other.address",
+ DefaultPrimary: true},
+ },
+ },
+ },
+ },
+ err: errStorageAddressMismatch,
+ },
+ {
+ desc: "Node storage not unique",
+ config: Config{
+ ListenAddr: "localhost:1234",
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{Name: "default", Nodes: nodes},
+ &VirtualStorage{
+ Name: "default",
+ Nodes: []*models.Node{
+ &models.Node{
+ Storage: nodes[0].Storage,
+ Address: "some.other.address",
+ DefaultPrimary: true}},
+ },
+ },
+ },
+ err: errVirtualStoragesNotUnique,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := tc.config.Validate()
- assert.Equal(t, tc.err, err)
+ if tc.err == nil {
+ assert.NoError(t, err)
+ return
+ }
+
+ assert.True(t, strings.Contains(err.Error(), tc.err.Error()))
})
}
}
@@ -75,7 +136,6 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- VirtualStorageName: "praefect",
Logging: log.Config{
Level: "info",
Format: "json",
@@ -84,19 +144,59 @@ func TestConfigParsing(t *testing.T) {
DSN: "abcd123",
Environment: "production",
},
- Nodes: []*models.Node{
- &models.Node{
- Address: "tcp://gitaly-internal-1.example.com",
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
- },
- {
- Address: "tcp://gitaly-internal-2.example.com",
- Storage: "praefect-internal-2",
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ Address: "tcp://gitaly-internal-1.example.com",
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ {
+ Address: "tcp://gitaly-internal-2.example.com",
+ Storage: "praefect-internal-2",
+ },
+ {
+ Address: "tcp://gitaly-internal-3.example.com",
+ Storage: "praefect-internal-3",
+ },
+ },
},
- {
- Address: "tcp://gitaly-internal-3.example.com",
- Storage: "praefect-internal-3",
+ },
+ },
+ },
+ //TODO: Remove this test, as well as the fixture in testdata/single-virtual-storage.config.toml
+ // once omnibus and gdk are updated with support for VirtualStorages
+ {
+ filePath: "testdata/single-virtual-storage.config.toml",
+ expected: Config{
+ Logging: log.Config{
+ Level: "info",
+ Format: "json",
+ },
+ Sentry: sentry.Config{
+ DSN: "abcd123",
+ Environment: "production",
+ },
+ VirtualStorages: []*VirtualStorage{
+ &VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ Address: "tcp://gitaly-internal-1.example.com",
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ {
+ Address: "tcp://gitaly-internal-2.example.com",
+ Storage: "praefect-internal-2",
+ },
+ {
+ Address: "tcp://gitaly-internal-3.example.com",
+ Storage: "praefect-internal-3",
+ },
+ },
},
},
},
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 1c85c7e47..bd1958975 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -1,4 +1,3 @@
-virtual_storage_name = "praefect"
listen_addr = ""
socket_path = ""
prometheus_listen_addr = ""
@@ -11,15 +10,18 @@ prometheus_listen_addr = ""
sentry_environment = "production"
sentry_dsn = "abcd123"
-[[node]]
- address = "tcp://gitaly-internal-1.example.com"
- storage = "praefect-internal-1"
- primary = true
+[[virtual_storage]]
+name = "praefect"
-[[node]]
- address = "tcp://gitaly-internal-2.example.com"
- storage = "praefect-internal-2"
-
-[[node]]
- address = "tcp://gitaly-internal-3.example.com"
- storage = "praefect-internal-3"
+ [[virtual_storage.node]]
+ address = "tcp://gitaly-internal-1.example.com"
+ storage = "praefect-internal-1"
+ primary = true
+
+ [[virtual_storage.node]]
+ address = "tcp://gitaly-internal-2.example.com"
+ storage = "praefect-internal-2"
+
+ [[virtual_storage.node]]
+ address = "tcp://gitaly-internal-3.example.com"
+ storage = "praefect-internal-3"
diff --git a/internal/praefect/config/testdata/single-virtual-storage.config.toml b/internal/praefect/config/testdata/single-virtual-storage.config.toml
new file mode 100644
index 000000000..e98381875
--- /dev/null
+++ b/internal/praefect/config/testdata/single-virtual-storage.config.toml
@@ -0,0 +1,25 @@
+listen_addr = ""
+socket_path = ""
+prometheus_listen_addr = ""
+virtual_storage_name = "praefect"
+
+[logging]
+ format = "json"
+ level = "info"
+
+[sentry]
+ sentry_environment = "production"
+ sentry_dsn = "abcd123"
+
+[[node]]
+ address = "tcp://gitaly-internal-1.example.com"
+ storage = "praefect-internal-1"
+ primary = true
+
+[[node]]
+ address = "tcp://gitaly-internal-2.example.com"
+ storage = "praefect-internal-2"
+
+[[node]]
+ address = "tcp://gitaly-internal-3.example.com"
+ storage = "praefect-internal-3"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index fa322da2b..1292ae7ae 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -22,8 +22,6 @@ import (
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
func isDestructive(methodName string) bool {
@@ -128,15 +126,12 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
return "", nil, err
}
- if targetRepo.StorageName != c.conf.VirtualStorageName {
- return "", nil, status.Errorf(codes.InvalidArgument, "only messages for %s are allowed", c.conf.VirtualStorageName)
- }
-
primary, err := c.selectPrimary(mi, targetRepo)
if err != nil {
return "", nil, err
}
+ // rewrite storage name
targetRepo.StorageName = primary.Storage
additionalRepo, ok, err := mi.AdditionalRepo(m)
@@ -159,6 +154,7 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
requestFinalizer := noopRequestFinalizer
+ // TODO: move the logic of creating replication jobs to the streamDirector method
if mi.Operation == protoregistry.OpMutator {
change := datastore.UpdateRepo
if isDestructive(method) {
@@ -193,13 +189,13 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
}
- newPrimary, err := c.datastore.PickAPrimary()
+ newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName())
if err != nil {
return nil, fmt.Errorf("could not choose a primary: %v", err)
}
// set the primary
- if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.Storage); err != nil {
return nil, err
}
@@ -208,7 +204,7 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
if replica.DefaultPrimary {
continue
}
- if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.ID); err != nil {
+ if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.Storage); err != nil {
return nil, err
}
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 76a0a84d3..bd941f97a 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -30,17 +30,21 @@ func TestSecondaryRotation(t *testing.T) {
func TestStreamDirector(t *testing.T) {
conf := config.Config{
- VirtualStorageName: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- Address: "tcp://gitaly-primary.example.com",
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ Address: "tcp://gitaly-primary.example.com",
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ &models.Node{
+ Address: "tcp://gitaly-backup1.example.com",
+ Storage: "praefect-internal-2",
+ }},
},
- &models.Node{
- Address: "tcp://gitaly-backup1.example.com",
- Storage: "praefect-internal-2",
- }},
+ },
}
ds := datastore.NewInMemory(conf)
@@ -87,13 +91,14 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name")
- jobs, err := ds.GetJobs(datastore.JobStatePending, 1, 10)
+ jobs, err := ds.GetJobs(datastore.JobStatePending, "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- targetNode, err := ds.GetStorageNode(1)
+ targetNode, err := ds.GetStorageNode("praefect-internal-2")
require.NoError(t, err)
- sourceNode, err := ds.GetStorageNode(0)
+ sourceNode, err := ds.GetStorageNode("praefect-internal-1")
+
require.NoError(t, err)
expectedJob := datastore.ReplJob{
@@ -109,7 +114,7 @@ func TestStreamDirector(t *testing.T) {
jobUpdateFunc()
- jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10)
+ jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 8bb18cd07..378adc692 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -80,21 +80,21 @@ type Datastore interface {
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- PickAPrimary() (*models.Node, error)
+ PickAPrimary(virtualStorage string) (*models.Node, error)
GetReplicas(relativePath string) ([]models.Node, error)
- GetStorageNode(nodeID int) (models.Node, error)
+ GetStorageNode(nodeStorage string) (models.Node, error)
GetStorageNodes() ([]models.Node, error)
GetPrimary(relativePath string) (*models.Node, error)
- SetPrimary(relativePath string, storageNodeID int) error
+ SetPrimary(relativePath, nodeStorage string) error
- AddReplica(relativePath string, storageNodeID int) error
+ AddReplica(relativePath string, nodeStorage string) error
- RemoveReplica(relativePath string, storageNodeID int) error
+ RemoveReplica(relativePath, nodeStorage string) error
GetRepository(relativePath string) (*models.Repository, error)
}
@@ -105,7 +105,7 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, nodeStorage string, count int) ([]ReplJob, error)
// CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
@@ -117,10 +117,10 @@ type ReplJobsDatastore interface {
}
type jobRecord struct {
- change ChangeType
- relativePath string // project's relative path
- targetNodeID, sourceNodeID int
- state JobState
+ change ChangeType
+ relativePath string // project's relative path
+ targetNodeStorage, sourceNodeStorage string
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -134,13 +134,18 @@ type MemoryDatastore struct {
storageNodes *struct {
sync.RWMutex
- m map[int]models.Node
+ m map[string]models.Node
}
repositories *struct {
sync.RWMutex
m map[string]models.Repository
}
+
+ virtualStorages *struct {
+ sync.RWMutex
+ m map[string][]*models.Node
+ }
}
// NewInMemory returns an initialized in-memory datastore
@@ -148,9 +153,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
storageNodes: &struct {
sync.RWMutex
- m map[int]models.Node
+ m map[string]models.Node
}{
- m: map[int]models.Node{},
+ m: map[string]models.Node{},
},
jobs: &struct {
sync.RWMutex
@@ -164,24 +169,36 @@ func NewInMemory(cfg config.Config) *MemoryDatastore {
}{
m: map[string]models.Repository{},
},
+ virtualStorages: &struct {
+ sync.RWMutex
+ m map[string][]*models.Node
+ }{
+ m: map[string][]*models.Node{},
+ },
}
- for i, storageNode := range cfg.Nodes {
- storageNode.ID = i
- m.storageNodes.m[i] = *storageNode
+ for _, virtualStorage := range cfg.VirtualStorages {
+ m.virtualStorages.m[virtualStorage.Name] = virtualStorage.Nodes
+
+ for _, node := range virtualStorage.Nodes {
+ if _, ok := m.storageNodes.m[node.Storage]; ok {
+ continue
+ }
+ m.storageNodes.m[node.Storage] = *node
+ }
}
return m
}
// PickAPrimary returns the primary configured in the config file
-func (md *MemoryDatastore) PickAPrimary() (*models.Node, error) {
+func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (*models.Node, error) {
md.storageNodes.RLock()
defer md.storageNodes.RUnlock()
- for _, node := range md.storageNodes.m {
+ for _, node := range md.virtualStorages.m[virtualStorage] {
if node.DefaultPrimary {
- return &node, nil
+ return node, nil
}
}
@@ -204,11 +221,11 @@ func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, erro
}
// GetStorageNode gets all storage nodes
-func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.Node, error) {
+func (md *MemoryDatastore) GetStorageNode(nodeStorage string) (models.Node, error) {
md.storageNodes.RLock()
defer md.storageNodes.RUnlock()
- node, ok := md.storageNodes.m[nodeID]
+ node, ok := md.storageNodes.m[nodeStorage]
if !ok {
return models.Node{}, errors.New("node not found")
}
@@ -239,15 +256,11 @@ func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error)
return nil, ErrPrimaryNotSet
}
- storageNode, ok := md.storageNodes.m[repository.Primary.ID]
- if !ok {
- return nil, errors.New("node storage not found")
- }
- return &storageNode, nil
+ return &repository.Primary, nil
}
// SetPrimary sets the primary storagee node for a repository of a repository relative path
-func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+func (md *MemoryDatastore) SetPrimary(relativePath, nodeStorage string) error {
md.repositories.Lock()
defer md.repositories.Unlock()
@@ -256,7 +269,7 @@ func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) er
repository = models.Repository{RelativePath: relativePath}
}
- storageNode, ok := md.storageNodes.m[storageNodeID]
+ storageNode, ok := md.storageNodes.m[nodeStorage]
if !ok {
return errors.New("node storage not found")
}
@@ -268,7 +281,7 @@ func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) er
}
// AddReplica adds a secondary to a repository of a repository relative path
-func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error {
+func (md *MemoryDatastore) AddReplica(relativePath, nodeStorage string) error {
md.repositories.Lock()
defer md.repositories.Unlock()
@@ -277,7 +290,7 @@ func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) er
return errors.New("repository not found")
}
- storageNode, ok := md.storageNodes.m[storageNodeID]
+ storageNode, ok := md.storageNodes.m[nodeStorage]
if !ok {
return errors.New("node storage not found")
}
@@ -289,7 +302,7 @@ func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) er
}
// RemoveReplica removes a secondary from a repository of a repository relative path
-func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
+func (md *MemoryDatastore) RemoveReplica(relativePath, nodeStorage string) error {
md.repositories.Lock()
defer md.repositories.Unlock()
@@ -300,7 +313,7 @@ func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int)
var secondaries []models.Node
for _, secondary := range repository.Replicas {
- if secondary.ID != storageNodeID {
+ if secondary.Storage != nodeStorage {
secondaries = append(secondaries, secondary)
}
}
@@ -328,7 +341,7 @@ func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repositor
var ErrReplicasMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetNodeStorage string, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -336,7 +349,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int)
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNodeID == targetNodeID {
+ if record.state&state != 0 && record.targetNodeStorage == targetNodeStorage {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -362,11 +375,11 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
return ReplJob{}, err
}
- sourceNode, err := md.GetStorageNode(record.sourceNodeID)
+ sourceNode, err := md.GetStorageNode(record.sourceNodeStorage)
if err != nil {
return ReplJob{}, err
}
- targetNode, err := md.GetStorageNode(record.targetNodeID)
+ targetNode, err := md.GetStorageNode(record.targetNodeStorage)
if err != nil {
return ReplJob{}, err
}
@@ -409,11 +422,11 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change Cha
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.records[nextID] = jobRecord{
- change: change,
- targetNodeID: secondary.ID,
- state: JobStatePending,
- relativePath: relativePath,
- sourceNodeID: repository.Primary.ID,
+ change: change,
+ targetNodeStorage: secondary.Storage,
+ state: JobStatePending,
+ relativePath: relativePath,
+ sourceNodeStorage: repository.Primary.Storage,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 1be1e1de0..c5d8b398e 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -10,13 +10,11 @@ import (
var (
stor1 = models.Node{
- ID: 0,
Address: "tcp://address-1",
Storage: "praefect-storage-1",
DefaultPrimary: true,
}
stor2 = models.Node{
- ID: 1,
Address: "tcp://address-2",
Storage: "praefect-storage-2",
}
@@ -36,7 +34,7 @@ var operations = []struct {
{
desc: "query an empty datastore",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -51,14 +49,14 @@ var operations = []struct {
{
desc: "set the primary for the repository",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID)
+ err := ds.SetPrimary(repo1Repository.RelativePath, stor1.Storage)
require.NoError(t, err)
},
},
{
desc: "add a secondary replica for the repository",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID)
+ err := ds.AddReplica(repo1Repository.RelativePath, stor2.Storage)
require.NoError(t, err)
},
},
@@ -73,7 +71,7 @@ var operations = []struct {
{
desc: "fetch inserted replication jobs after primary mapped",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10)
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.Storage, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
@@ -102,7 +100,7 @@ var operations = []struct {
{
desc: "try fetching completed replication job",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -113,7 +111,11 @@ var operations = []struct {
var flavors = map[string]func() Datastore{
"in-memory-datastore": func() Datastore {
return NewInMemory(config.Config{
- Nodes: []*models.Node{&stor1, &stor2},
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Nodes: []*models.Node{&stor1, &stor2},
+ },
+ },
})
},
}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index f1f8778b5..4d11354bb 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -41,15 +41,10 @@ func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) {
// generates a praefect configuration with the specified number of backend
// nodes
func testConfig(backends int) config.Config {
- cfg := config.Config{
- VirtualStorageName: "praefect",
- }
-
var nodes []*models.Node
for i := 0; i < backends; i++ {
n := &models.Node{
- ID: i,
Storage: fmt.Sprintf("praefect-internal-%d", i),
Token: fmt.Sprintf("%d", i),
}
@@ -60,8 +55,14 @@ func testConfig(backends int) config.Config {
nodes = append(nodes, n)
}
-
- cfg.Nodes = nodes
+ cfg := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: nodes,
+ },
+ },
+ }
return cfg
}
@@ -75,7 +76,7 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
)
var defaultNode *models.Node
- for _, n := range conf.Nodes {
+ for _, n := range conf.VirtualStorages[0].Nodes {
if n.DefaultPrimary {
defaultNode = n
}
@@ -104,27 +105,30 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
// Each mock server is keyed by the corresponding index of the node in the
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
-func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
+// requires there to be only 1 virtual storage
+func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
clientCC := conn.NewClientConnections()
+
+ require.Len(t, conf.VirtualStorages, 1)
+ require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes),
+ "mock server count doesn't match config nodes")
+
var cleanups []testhelper.Cleanup
- for i, node := range conf.Nodes {
- backend, ok := backends[i]
- require.True(t, ok, "missing backend server for node %d", i)
+ for i, node := range conf.VirtualStorages[0].Nodes {
+ backend, ok := backends[node.Storage]
+ require.True(t, ok, "missing backend server for node %s", node.Storage)
backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
cleanups = append(cleanups, cleanup)
clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
- conf.Nodes[i] = node
+ conf.VirtualStorages[0].Nodes[i] = node
}
_, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
- require.Equal(t, len(backends), len(conf.Nodes),
- "mock server count doesn't match config nodes")
-
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
@@ -154,17 +158,19 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[in
}
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
+// requires exactly 1 virtual storage
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+ require.Len(t, conf.VirtualStorages, 1)
clientCC := conn.NewClientConnections()
var cleanups []testhelper.Cleanup
- for i, node := range conf.Nodes {
+ for i, node := range conf.VirtualStorages[0].Nodes {
_, backendAddr, cleanup := runInternalGitalyServer(t, node.Token)
cleanups = append(cleanups, cleanup)
clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
- conf.Nodes[i] = node
+ conf.VirtualStorages[0].Nodes[i] = node
}
ds := datastore.NewInMemory(conf)
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
index 23918021f..5dfaf3667 100644
--- a/internal/praefect/models/node.go
+++ b/internal/praefect/models/node.go
@@ -2,7 +2,6 @@ package models
// Node describes an address that serves a storage
type Node struct {
- ID int
Storage string `toml:"storage"`
Address string `toml:"address"`
Token string `toml:"token"`
@@ -11,7 +10,6 @@ type Node struct {
// Repository describes a repository's relative path and its primary and list of secondaries
type Repository struct {
- ID int
RelativePath string
Primary Node
Replicas []Node
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index e12aab139..8ff54b259 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -255,14 +255,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.ID, 10)
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.Storage, 10)
if err != nil {
return err
}
if len(jobs) == 0 {
r.log.WithFields(logrus.Fields{
- "node_id": node.ID,
+ "node_storage": node.Storage,
"recheck_interval": jobFetchInterval,
}).Trace("no jobs")
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 39623c174..a21011c48 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -71,32 +71,34 @@ func TestProceessReplicationJob(t *testing.T) {
)
config := config.Config{
- Nodes: []*models.Node{
- &models.Node{
- ID: 0,
- Storage: "default",
- Address: srvSocketPath,
- Token: gitaly_config.Config.Auth.Token,
- DefaultPrimary: true,
- },
- &models.Node{
- ID: 1,
- Storage: backupStorageName,
- Address: srvSocketPath,
- Token: gitaly_config.Config.Auth.Token,
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Nodes: []*models.Node{
+ &models.Node{
+ Storage: "default",
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ DefaultPrimary: true,
+ },
+ &models.Node{
+ Storage: backupStorageName,
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ },
+ },
},
},
}
ds := datastore.NewInMemory(config)
- ds.SetPrimary(testRepo.GetRelativePath(), 0)
- ds.AddReplica(testRepo.GetRelativePath(), 1)
+ ds.SetPrimary(testRepo.GetRelativePath(), "default")
+ ds.AddReplica(testRepo.GetRelativePath(), backupStorageName)
_, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo)
require.NoError(t, err)
- jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, 1, 1)
+ jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ada0d05f6..46ee62cea 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -37,18 +37,22 @@ type Server struct {
}
func (srv *Server) warnDupeAddrs(c config.Config) {
- addrSet := map[string]struct{}{}
- fishy := false
- for _, n := range c.Nodes {
- _, ok := addrSet[n.Address]
- if ok {
- srv.l.Warnf("more than one backend node is hosted at %s", n.Address)
- fishy = true
+ var fishy bool
+
+ for _, virtualStorage := range c.VirtualStorages {
+ addrSet := map[string]struct{}{}
+ for _, n := range virtualStorage.Nodes {
+ _, ok := addrSet[n.Address]
+ if ok {
+ srv.l.Warnf("more than one backend node is hosted at %s", n.Address)
+ fishy = true
+ continue
+ }
+ addrSet[n.Address] = struct{}{}
+ }
+ if fishy {
+ srv.l.Warnf("your Praefect configuration may not offer actual redundancy")
}
- addrSet[n.Address] = struct{}{}
- }
- if fishy {
- srv.l.Warnf("your Praefect configuration may not offer actual redundancy")
}
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index b75aa3c59..b0ecdb7de 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -2,7 +2,6 @@ package praefect
import (
"context"
- "fmt"
"io/ioutil"
"os"
"strings"
@@ -23,9 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
- "google.golang.org/grpc/status"
)
func TestServerRouteServerAccessor(t *testing.T) {
@@ -38,8 +35,8 @@ func TestServerRouteServerAccessor(t *testing.T) {
// note: a server scoped RPC will be randomly routed
// to an available backend server. To simplify our
// test, a single backend server is used.
- backends = map[int]mock.SimpleServiceServer{
- 0: &mockSvc{
+ backends = map[string]mock.SimpleServiceServer{
+ conf.VirtualStorages[0].Nodes[0].Storage: &mockSvc{
serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
reqQ <- req
return expectResp, nil
@@ -75,18 +72,20 @@ func TestServerRouteServerAccessor(t *testing.T) {
func TestGitalyServerInfo(t *testing.T) {
conf := config.Config{
- Nodes: []*models.Node{
- &models.Node{
- ID: 1,
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
- Token: "abc",
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Nodes: []*models.Node{
+ &models.Node{
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ Token: "abc",
+ },
+ &models.Node{
+ Storage: "praefect-internal-2",
+ Token: "xyz",
+ }},
},
- &models.Node{
- ID: 2,
- Storage: "praefect-internal-2",
- Token: "xyz",
- }},
+ },
}
cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
defer cleanup()
@@ -98,7 +97,7 @@ func TestGitalyServerInfo(t *testing.T) {
metadata, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
require.NoError(t, err)
- require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes))
+ require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes))
require.Equal(t, version.GetVersion(), metadata.GetServerVersion())
gitVersion, err := git.Version()
@@ -112,18 +111,20 @@ func TestGitalyServerInfo(t *testing.T) {
func TestGitalyDiskStatistics(t *testing.T) {
conf := config.Config{
- Nodes: []*models.Node{
- {
- ID: 1,
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
- Token: "abc",
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Nodes: []*models.Node{
+ {
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ Token: "abc",
+ },
+ {
+ Storage: "praefect-internal-2",
+ Token: "xyz",
+ }},
},
- {
- ID: 2,
- Storage: "praefect-internal-2",
- Token: "xyz",
- }},
+ },
}
cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
defer cleanup()
@@ -156,12 +157,16 @@ func TestHealthCheck(t *testing.T) {
func TestRejectBadStorage(t *testing.T) {
conf := config.Config{
- VirtualStorageName: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- DefaultPrimary: true,
- Storage: "praefect-internal-0",
- Address: "tcp::/this-doesnt-matter",
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
},
},
}
@@ -180,22 +185,39 @@ func TestRejectBadStorage(t *testing.T) {
defer cancel()
_, err := repoClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: &badTargetRepo})
- testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
- require.Equal(t, fmt.Sprintf("only messages for %s are allowed", conf.VirtualStorageName), status.Convert(err).Message())
+ require.Error(t, err)
}
func TestWarnDuplicateAddrs(t *testing.T) {
conf := config.Config{
- VirtualStorageName: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- DefaultPrimary: true,
- Storage: "praefect-internal-0",
- Address: "tcp::/samesies",
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "default",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp://abc",
+ },
+ &models.Node{
+ Storage: "praefect-internal-1",
+ Address: "tcp://xyz",
+ },
+ },
},
- &models.Node{
- Storage: "praefect-internal-1",
- Address: "tcp::/samesies",
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp://abc",
+ },
+ &models.Node{
+ Storage: "praefect-internal-1",
+ Address: "tcp://xyz",
+ },
+ },
},
},
}
@@ -205,31 +227,103 @@ func TestWarnDuplicateAddrs(t *testing.T) {
setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
for _, entry := range hook.Entries {
+ require.NotContains(t, entry.Message, "more than one backend node")
+ }
+
+ conf = config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp::/samesies",
+ },
+ &models.Node{
+ Storage: "praefect-internal-1",
+ Address: "tcp::/samesies",
+ },
+ },
+ },
+ },
+ }
+
+ tLogger, hook = test.NewNullLogger()
+
+ setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+
+ var found bool
+ for _, entry := range hook.Entries {
if strings.Contains(entry.Message, "more than one backend node") {
- return // pass!
+ found = true
+ break
}
}
- t.Fatal("could not find expected log message")
+ require.True(t, found, "expected to find error log")
+
+ conf = config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "default",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp://abc",
+ },
+ &models.Node{
+ Storage: "praefect-internal-1",
+ Address: "tcp://xyz",
+ },
+ },
+ },
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp://abc",
+ },
+ &models.Node{
+ Storage: "praefect-internal-2",
+ Address: "tcp://xyz",
+ },
+ },
+ },
+ },
+ }
+
+ tLogger, hook = test.NewNullLogger()
+
+ setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+
+ for _, entry := range hook.Entries {
+ require.NotContains(t, entry.Message, "more than one backend node")
+ }
}
func TestRepoRemoval(t *testing.T) {
conf := config.Config{
- VirtualStorageName: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- DefaultPrimary: true,
- Storage: gconfig.Config.Storages[0].Name,
- Address: "tcp::/samesies",
- },
- &models.Node{
- ID: 1,
- Storage: "praefect-internal-1",
- Address: "tcp::/this-doesnt-matter",
- },
- &models.Node{
- ID: 2,
- Storage: "praefect-internal-2",
- Address: "tcp::/this-doesnt-matter",
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: gconfig.Config.Storages[0].Name,
+ Address: "tcp::/samesies",
+ },
+ &models.Node{
+ Storage: "praefect-internal-1",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ &models.Node{
+ Storage: "praefect-internal-2",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
},
},
}
@@ -239,11 +333,11 @@ func TestRepoRemoval(t *testing.T) {
testStorages := []gconfig.Storage{
{
- Name: conf.Nodes[1].Storage,
+ Name: conf.VirtualStorages[0].Nodes[1].Storage,
Path: tempStoragePath(t),
},
{
- Name: conf.Nodes[2].Storage,
+ Name: conf.VirtualStorages[0].Nodes[2].Storage,
Path: tempStoragePath(t),
},
}
@@ -257,9 +351,9 @@ func TestRepoRemoval(t *testing.T) {
tRepo, _, tCleanup := testhelper.NewTestRepo(t)
defer tCleanup()
- _, path1, cleanup1 := cloneRepoAtStorage(t, tRepo, conf.Nodes[1].Storage)
+ _, path1, cleanup1 := cloneRepoAtStorage(t, tRepo, conf.VirtualStorages[0].Nodes[1].Storage)
defer cleanup1()
- _, path2, cleanup2 := cloneRepoAtStorage(t, tRepo, conf.Nodes[2].Storage)
+ _, path2, cleanup2 := cloneRepoAtStorage(t, tRepo, conf.VirtualStorages[0].Nodes[2].Storage)
defer cleanup2()
// prerequisite: repos should exist at expected paths
@@ -273,7 +367,7 @@ func TestRepoRemoval(t *testing.T) {
defer cancel()
virtualRepo := *tRepo
- virtualRepo.StorageName = conf.VirtualStorageName
+ virtualRepo.StorageName = conf.VirtualStorages[0].Name
rClient := gitalypb.NewRepositoryServiceClient(cc)
diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go
index 76c2fd67b..e83394b04 100644
--- a/internal/praefect/service/server/info.go
+++ b/internal/praefect/service/server/info.go
@@ -3,24 +3,41 @@ package server
import (
"context"
"fmt"
+ "sync"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// ServerInfo sends ServerInfoRequest to all of a praefect server's internal gitaly nodes and aggregates the results into
// a response
func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
- storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(s.conf.Nodes))
+ var once sync.Once
+ nodesChecked := make(map[string]struct{})
+
+ var nodes []*models.Node
+ for _, virtualStorage := range s.conf.VirtualStorages {
+ for _, node := range virtualStorage.Nodes {
+ if _, ok := nodesChecked[node.Storage]; ok {
+ continue
+ }
+
+ nodesChecked[node.Storage] = struct{}{}
+ nodes = append(nodes, node)
+ }
+ }
var gitVersion, serverVersion string
g, ctx := errgroup.WithContext(ctx)
- for i, node := range s.conf.Nodes {
- i := i // necessary since it will be used in a goroutine below
+ storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(nodes))
+
+ for i, node := range nodes {
+ i := i
node := node
cc, err := s.clientCC.GetConnection(node.Storage)
if err != nil {
@@ -36,7 +53,9 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest)
storageStatuses[i] = resp.GetStorageStatuses()
if node.DefaultPrimary {
- gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
+ once.Do(func() {
+ gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
+ })
}
return nil