Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2019-10-07 10:01:34 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-10-07 10:01:34 +0300
commita5ca2426be195b74bf9cb8d9cccd2ca7681931cd (patch)
tree24f250d76b3e1402b3e1491ae986f10cda2e03da
parent3a26cd4aa9c043fe33c8c86b309d20baae1a3155 (diff)
parentceadcd87d65493ca32f7adbe858ecb31627ff1a5 (diff)
Merge branch 'jc-add-server-service-praefect' into 'master'
Allow praefect to handle ServerInfoRequest Closes #1809 See merge request gitlab-org/gitaly!1527
-rw-r--r--changelogs/unreleased/jc-add-server-service-praefect.yml5
-rw-r--r--cmd/praefect/main.go25
-rw-r--r--internal/praefect/conn/client_connections.go71
-rw-r--r--internal/praefect/conn/client_connections_test.go26
-rw-r--r--internal/praefect/coordinator.go56
-rw-r--r--internal/praefect/coordinator_test.go16
-rw-r--r--internal/praefect/replicator.go29
-rw-r--r--internal/praefect/replicator_test.go15
-rw-r--r--internal/praefect/server.go28
-rw-r--r--internal/praefect/server_test.go118
-rw-r--r--internal/praefect/service/server/info.go51
-rw-r--r--internal/praefect/service/server/server.go23
12 files changed, 363 insertions, 100 deletions
diff --git a/changelogs/unreleased/jc-add-server-service-praefect.yml b/changelogs/unreleased/jc-add-server-service-praefect.yml
new file mode 100644
index 000000000..b2a989c6d
--- /dev/null
+++ b/changelogs/unreleased/jc-add-server-service-praefect.yml
@@ -0,0 +1,5 @@
+---
+title: Allow praefect to handle ServerInfoRequest
+merge_request: 1527
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index d6f4466a3..a7c9beddf 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -92,12 +93,22 @@ func configure() (config.Config, error) {
func run(listeners []net.Listener, conf config.Config) error {
+ clientConnections := conn.NewClientConnections()
+
+ for _, node := range conf.Nodes {
+ if err := clientConnections.RegisterNode(node.Storage, node.Address); err != nil {
+ return fmt.Errorf("failed to register %s: %s", node.Address, err)
+ }
+
+ logger.WithField("node_address", node.Address).Info("registered gitaly node")
+ }
+
var (
// top level server dependencies
datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
- repl = praefect.NewReplMgr("default", logger, datastore, coordinator)
- srv = praefect.NewServer(coordinator, repl, nil, logger)
+ coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, protoregistry.GitalyProtoFileDescriptors...)
+ repl = praefect.NewReplMgr("default", logger, datastore, clientConnections)
+ srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh = make(chan os.Signal, len(signals))
@@ -113,14 +124,6 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- for _, node := range conf.Nodes {
- if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
- return fmt.Errorf("failed to register %s: %s", node.Address, err)
- }
-
- logger.WithField("node_address", node.Address).Info("registered gitaly node")
- }
-
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
go coordinator.FailoverRotation()
diff --git a/internal/praefect/conn/client_connections.go b/internal/praefect/conn/client_connections.go
new file mode 100644
index 000000000..8d4f3a033
--- /dev/null
+++ b/internal/praefect/conn/client_connections.go
@@ -0,0 +1,71 @@
+package conn
+
+import (
+ "errors"
+ "sync"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "google.golang.org/grpc"
+)
+
+// ErrConnectionNotFound indicates the connection for a given storage has not yet been registered
+var ErrConnectionNotFound = errors.New("client connection not found")
+
+// ErrAlreadyRegistered indicates the client connection for a given storage has already been registered
+var ErrAlreadyRegistered = errors.New("client connection already registered")
+
+// ClientConnections contains ready to use grpc client connections
+type ClientConnections struct {
+ connMutex sync.RWMutex
+ nodes map[string]*grpc.ClientConn
+}
+
+// NewClientConnections creates a new ClientConnections struct
+func NewClientConnections() *ClientConnections {
+ return &ClientConnections{
+ nodes: make(map[string]*grpc.ClientConn),
+ }
+}
+
+// RegisterNode will direct traffic to the supplied downstream connection when the storage location
+// is encountered.
+func (c *ClientConnections) RegisterNode(storageName, listenAddr string) error {
+ conn, err := client.Dial(listenAddr,
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)),
+ },
+ )
+ if err != nil {
+ return err
+ }
+
+ return c.setConn(storageName, conn)
+}
+
+func (c *ClientConnections) setConn(storageName string, conn *grpc.ClientConn) error {
+ c.connMutex.Lock()
+ if _, ok := c.nodes[storageName]; ok {
+ return ErrAlreadyRegistered
+ }
+ c.nodes[storageName] = conn
+ c.connMutex.Unlock()
+
+ return nil
+}
+
+// GetConnection gets the grpc client connection based on an address
+func (c *ClientConnections) GetConnection(storageName string) (*grpc.ClientConn, error) {
+ c.connMutex.RLock()
+ cc, ok := c.nodes[storageName]
+ c.connMutex.RUnlock()
+ if !ok {
+ return nil, ErrConnectionNotFound
+ }
+
+ return cc, nil
+
+}
diff --git a/internal/praefect/conn/client_connections_test.go b/internal/praefect/conn/client_connections_test.go
new file mode 100644
index 000000000..9e2f832d1
--- /dev/null
+++ b/internal/praefect/conn/client_connections_test.go
@@ -0,0 +1,26 @@
+package conn
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestRegisterNode(t *testing.T) {
+ storageName := "default"
+ tcpAddress := "address1"
+ clientConn := NewClientConnections()
+
+ _, err := clientConn.GetConnection(storageName)
+ require.Equal(t, ErrConnectionNotFound, err)
+
+ require.NoError(t, clientConn.RegisterNode(storageName, fmt.Sprintf("tcp://%s", tcpAddress)))
+
+ conn, err := clientConn.GetConnection(storageName)
+ require.NoError(t, err)
+ require.Equal(t, tcpAddress, conn.Target())
+
+ err = clientConn.RegisterNode(storageName, "tcp://some-other-address")
+ require.Equal(t, ErrAlreadyRegistered, err)
+}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 1c9ba2e1a..49a1d7990 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -9,9 +9,8 @@ import (
"sync"
"syscall"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -19,7 +18,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
)
@@ -28,26 +26,25 @@ import (
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
+ connections *conn.ClientConnections
log *logrus.Entry
failoverMutex sync.RWMutex
- connMutex sync.RWMutex
datastore Datastore
- nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
- log: l,
- datastore: datastore,
- nodes: make(map[string]*grpc.ClientConn),
- registry: registry,
+ log: l,
+ datastore: datastore,
+ registry: registry,
+ connections: clientConnections,
}
}
@@ -91,7 +88,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, err := c.GetConnection(storage)
+ cc, err := c.connections.GetConnection(storage)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
}
@@ -223,43 +220,6 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func()
}, nil
}
-// RegisterNode will direct traffic to the supplied downstream connection when the storage location
-// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
- conn, err := client.Dial(listenAddr,
- []grpc.DialOption{
- grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)),
- },
- )
- if err != nil {
- return err
- }
-
- c.setConn(storageName, conn)
-
- return nil
-}
-
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
- c.connMutex.Lock()
- c.nodes[storageName] = conn
- c.connMutex.Unlock()
-}
-
-// GetConnection gets the grpc client connection based on an address
-func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) {
- c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
- c.connMutex.RUnlock()
- if !ok {
- return nil, errors.New("client connection not found")
- }
-
- return cc, nil
-
-}
-
// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
func (c *Coordinator) FailoverRotation() {
c.handleSignalAndRotate()
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index d6e3c519a..267ce9fbb 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1,6 +1,7 @@
package praefect
import (
+ "fmt"
"io/ioutil"
"testing"
@@ -9,11 +10,11 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
var testLogger = logrus.New()
@@ -48,7 +49,11 @@ func TestStreamDirector(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- coordinator := NewCoordinator(log.Default(), datastore)
+ address := "gitaly-primary.example.com"
+ clientConnections := conn.NewClientConnections()
+ clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address))
+
+ coordinator := NewCoordinator(log.Default(), datastore, clientConnections)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{
@@ -56,14 +61,9 @@ func TestStreamDirector(t *testing.T) {
})
require.NoError(t, err)
- cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
- require.NoError(t, err)
-
- coordinator.setConn("praefect-internal-1", cc)
-
_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
require.NoError(t, err)
- require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use")
+ require.Equal(t, address, conn.Target())
jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
require.NoError(t, err)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 61cfdceed..b4c18083f 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -148,11 +149,11 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
- log *logrus.Entry
- datastore Datastore
- coordinator *Coordinator
- targetNode string // which replica is this replicator responsible for?
- replicator Replicator // does the actual replication logic
+ log *logrus.Entry
+ datastore Datastore
+ clientConnections *conn.ClientConnections
+ targetNode string // which replica is this replicator responsible for?
+ replicator Replicator // does the actual replication logic
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
@@ -163,14 +164,14 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
- log: log,
- datastore: datastore,
- whitelist: map[string]struct{}{},
- replicator: defaultReplicator{log},
- targetNode: targetNode,
- coordinator: c,
+ log: log,
+ datastore: datastore,
+ whitelist: map[string]struct{}{},
+ replicator: defaultReplicator{log},
+ targetNode: targetNode,
+ clientConnections: c,
}
for _, opt := range opts {
@@ -275,12 +276,12 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
return err
}
- targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
if err != nil {
return err
}
- sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage)
+ sourceCC, err := r.clientConnections.GetConnection(job.Repository.Primary.Storage)
if err != nil {
return err
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b50db5194..22b7ed024 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -15,6 +15,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -96,15 +97,15 @@ func TestProceessReplicationJob(t *testing.T) {
var replicator defaultReplicator
replicator.log = gitaly_log.Default()
- coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)}
- coordinator.RegisterNode("default", srvSocketPath)
- coordinator.RegisterNode("backup", srvSocketPath)
+ clientCC := conn.NewClientConnections()
+ clientCC.RegisterNode("default", srvSocketPath)
+ clientCC.RegisterNode("backup", srvSocketPath)
replMgr := &ReplMgr{
- log: gitaly_log.Default(),
- datastore: m,
- coordinator: coordinator,
- replicator: replicator,
+ log: gitaly_log.Default(),
+ datastore: m,
+ clientConnections: clientCC,
+ replicator: replicator,
}
require.NoError(t, replMgr.processReplJob(ctx, replJob))
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 3a7c0fabe..7e2307e62 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -12,7 +12,11 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
"google.golang.org/grpc"
@@ -20,14 +24,15 @@ import (
// Server is a praefect server
type Server struct {
- coordinator *Coordinator
- repl ReplMgr
- s *grpc.Server
+ clientConnections *conn.ClientConnections
+ repl ReplMgr
+ s *grpc.Server
+ conf config.Config
}
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server {
+func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server {
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
@@ -52,9 +57,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}...)
return &Server{
- s: grpc.NewServer(grpcOpts...),
- coordinator: c,
- repl: repl,
+ s: grpc.NewServer(grpcOpts...),
+ repl: repl,
+ clientConnections: clientConnections,
+ conf: conf,
}
}
@@ -69,9 +75,17 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
// listener. Function will block until the server is stopped or an
// unrecoverable error occurs.
func (srv *Server) Start(lis net.Listener) error {
+ srv.registerServices()
+
return srv.s.Serve(lis)
}
+// registerServices will register any services praefect needs to handle rpcs on its own
+func (srv *Server) registerServices() {
+ // ServerServiceServer is necessary for the ServerInfo RPC
+ gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections))
+}
+
// Shutdown will attempt a graceful shutdown of the grpc server. If unable
// to gracefully shutdown within the context deadline, it will then
// forcefully shutdown the server and return a context cancellation error.
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index cbc8e2482..6545f3cdf 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -12,10 +12,15 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/server/auth"
+ gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
)
@@ -57,7 +62,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
storagePrimary = "default"
)
- datastore := NewMemoryDatastore(config.Config{
+ conf := config.Config{
Nodes: []*models.Node{
&models.Node{
ID: 1,
@@ -68,31 +73,38 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
ID: 2,
Storage: "praefect-internal-2",
}},
- })
+ }
+
+ datastore := NewMemoryDatastore(conf)
logEntry := log.Default()
- coordinator := NewCoordinator(logEntry, datastore, fd)
+
+ clientCC := conn.NewClientConnections()
for id, nodeStorage := range datastore.storageNodes.m {
backend, cleanup := newMockDownstream(t, tt.callback)
defer cleanup() // clean up mock downstream server resources
- coordinator.RegisterNode(nodeStorage.Storage, backend)
+ clientCC.RegisterNode(nodeStorage.Storage, backend)
nodeStorage.Address = backend
datastore.storageNodes.m[id] = nodeStorage
}
+ coordinator := NewCoordinator(logEntry, datastore, clientCC, fd)
+
replmgr := NewReplMgr(
storagePrimary,
logEntry,
datastore,
- coordinator,
+ clientCC,
)
prf := NewServer(
coordinator,
replmgr,
nil,
logEntry,
+ clientCC,
+ conf,
)
listener, port := listenAvailPort(t)
@@ -126,6 +138,102 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
}
}
+func TestGitalyServerInfo(t *testing.T) {
+ conf := config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ ID: 1,
+ Storage: "praefect-internal-1",
+ DefaultPrimary: true,
+ },
+ &models.Node{
+ ID: 2,
+ Storage: "praefect-internal-2",
+ }},
+ }
+ cc, srv := runFullPraefectServer(t, conf)
+ defer srv.s.Stop()
+
+ client := gitalypb.NewServerServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ metadata, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
+ require.NoError(t, err)
+ require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes))
+
+ for _, storageStatus := range metadata.GetStorageStatuses() {
+ require.NotNil(t, storageStatus, "none of the storage statuses should be nil")
+ }
+}
+
+func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) {
+ datastore := NewMemoryDatastore(conf)
+
+ logEntry := log.Default()
+
+ clientCC := conn.NewClientConnections()
+ for id, nodeStorage := range datastore.storageNodes.m {
+ _, backend := runInternalGitalyServer(t)
+
+ clientCC.RegisterNode(nodeStorage.Storage, backend)
+ nodeStorage.Address = backend
+ datastore.storageNodes.m[id] = nodeStorage
+ }
+
+ coordinator := NewCoordinator(logEntry, datastore, clientCC, protoregistry.GitalyProtoFileDescriptors...)
+
+ replmgr := NewReplMgr(
+ "",
+ logEntry,
+ datastore,
+ clientCC,
+ )
+
+ prf := NewServer(
+ coordinator,
+ replmgr,
+ nil,
+ logEntry,
+ clientCC,
+ conf,
+ )
+
+ listener, port := listenAvailPort(t)
+ t.Logf("proxy listening on port %d", port)
+
+ errQ := make(chan error)
+
+ go func() {
+ errQ <- prf.Start(listener)
+ }()
+
+ // dial client to praefect
+ cc := dialLocalPort(t, port, false)
+
+ return cc, prf
+}
+
+func runInternalGitalyServer(t *testing.T) (*grpc.Server, string) {
+ streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor()}
+ unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor()}
+
+ server := testhelper.NewTestGrpcServer(t, streamInt, unaryInt)
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
+
+ go server.Serve(listener)
+
+ return server, "unix://" + serverSocketPath
+}
+
func callbackIncrement(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
return &mock.SimpleResponse{
Value: req.Value + 1,
diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go
new file mode 100644
index 000000000..db01acf4f
--- /dev/null
+++ b/internal/praefect/service/server/info.go
@@ -0,0 +1,51 @@
+package server
+
+import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
+
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+)
+
+// ServerInfo sends ServerInfoRequest to all of a praefect server's internal gitaly nodes and aggregates the results into
+// a response
+func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
+
+ storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(s.conf.Nodes))
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ for i, node := range s.conf.Nodes {
+ i := i // necessary since it will be used in a goroutine below
+ cc, err := s.clientCC.GetConnection(node.Storage)
+ if err != nil {
+ return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err)
+ }
+ g.Go(func() error {
+ client := gitalypb.NewServerServiceClient(cc)
+ resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
+ if err != nil {
+ return fmt.Errorf("error when requesting server info from internal storage %v", node.Storage)
+ }
+
+ storageStatuses[i] = resp.GetStorageStatuses()
+
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+
+ var response gitalypb.ServerInfoResponse
+
+ for _, storageStatus := range storageStatuses {
+ response.StorageStatuses = append(response.StorageStatuses, storageStatus...)
+ }
+
+ return &response, nil
+}
diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go
new file mode 100644
index 000000000..30e04f529
--- /dev/null
+++ b/internal/praefect/service/server/server.go
@@ -0,0 +1,23 @@
+package server
+
+import (
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+// Server is a ServerService server
+type Server struct {
+ clientCC *conn.ClientConnections
+ conf config.Config
+}
+
+// NewServer creates a new instance of a grpc ServerServiceServer
+func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer {
+ s := &Server{
+ clientCC: clientConnections,
+ conf: conf,
+ }
+
+ return s
+}