diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-10-07 10:01:34 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2019-10-07 10:01:34 +0300 |
commit | a5ca2426be195b74bf9cb8d9cccd2ca7681931cd (patch) | |
tree | 24f250d76b3e1402b3e1491ae986f10cda2e03da | |
parent | 3a26cd4aa9c043fe33c8c86b309d20baae1a3155 (diff) | |
parent | ceadcd87d65493ca32f7adbe858ecb31627ff1a5 (diff) |
Merge branch 'jc-add-server-service-praefect' into 'master'
Allow praefect to handle ServerInfoRequest
Closes #1809
See merge request gitlab-org/gitaly!1527
-rw-r--r-- | changelogs/unreleased/jc-add-server-service-praefect.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 25 | ||||
-rw-r--r-- | internal/praefect/conn/client_connections.go | 71 | ||||
-rw-r--r-- | internal/praefect/conn/client_connections_test.go | 26 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 56 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 29 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 15 | ||||
-rw-r--r-- | internal/praefect/server.go | 28 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 118 | ||||
-rw-r--r-- | internal/praefect/service/server/info.go | 51 | ||||
-rw-r--r-- | internal/praefect/service/server/server.go | 23 |
12 files changed, 363 insertions, 100 deletions
diff --git a/changelogs/unreleased/jc-add-server-service-praefect.yml b/changelogs/unreleased/jc-add-server-service-praefect.yml new file mode 100644 index 000000000..b2a989c6d --- /dev/null +++ b/changelogs/unreleased/jc-add-server-service-praefect.yml @@ -0,0 +1,5 @@ +--- +title: Allow praefect to handle ServerInfoRequest +merge_request: 1527 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index d6f4466a3..a7c9beddf 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -92,12 +93,22 @@ func configure() (config.Config, error) { func run(listeners []net.Listener, conf config.Config) error { + clientConnections := conn.NewClientConnections() + + for _, node := range conf.Nodes { + if err := clientConnections.RegisterNode(node.Storage, node.Address); err != nil { + return fmt.Errorf("failed to register %s: %s", node.Address, err) + } + + logger.WithField("node_address", node.Address).Info("registered gitaly node") + } + var ( // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, datastore, coordinator) - srv = praefect.NewServer(coordinator, repl, nil, logger) + coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, datastore, clientConnections) + srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} termCh = make(chan os.Signal, len(signals)) @@ -113,14 +124,6 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for _, node := range conf.Nodes { - if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { - return fmt.Errorf("failed to register %s: %s", node.Address, err) - } - - logger.WithField("node_address", node.Address).Info("registered gitaly node") - } - go func() { serverErrors <- repl.ProcessBacklog(ctx) }() go coordinator.FailoverRotation() diff --git a/internal/praefect/conn/client_connections.go b/internal/praefect/conn/client_connections.go new file mode 100644 index 000000000..8d4f3a033 --- /dev/null +++ b/internal/praefect/conn/client_connections.go @@ -0,0 +1,71 @@ +package conn + +import ( + "errors" + "sync" + + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "google.golang.org/grpc" +) + +// ErrConnectionNotFound indicates the connection for a given storage has not yet been registered +var ErrConnectionNotFound = errors.New("client connection not found") + +// ErrAlreadyRegistered indicates the client connection for a given storage has already been registered +var ErrAlreadyRegistered = errors.New("client connection already registered") + +// ClientConnections contains ready to use grpc client connections +type ClientConnections struct { + connMutex sync.RWMutex + nodes map[string]*grpc.ClientConn +} + +// NewClientConnections creates a new ClientConnections struct +func NewClientConnections() *ClientConnections { + return &ClientConnections{ + nodes: make(map[string]*grpc.ClientConn), + } +} + +// RegisterNode will direct traffic to the supplied downstream connection when the storage location +// is encountered. +func (c *ClientConnections) RegisterNode(storageName, listenAddr string) error { + conn, err := client.Dial(listenAddr, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), + }, + ) + if err != nil { + return err + } + + return c.setConn(storageName, conn) +} + +func (c *ClientConnections) setConn(storageName string, conn *grpc.ClientConn) error { + c.connMutex.Lock() + if _, ok := c.nodes[storageName]; ok { + return ErrAlreadyRegistered + } + c.nodes[storageName] = conn + c.connMutex.Unlock() + + return nil +} + +// GetConnection gets the grpc client connection based on an address +func (c *ClientConnections) GetConnection(storageName string) (*grpc.ClientConn, error) { + c.connMutex.RLock() + cc, ok := c.nodes[storageName] + c.connMutex.RUnlock() + if !ok { + return nil, ErrConnectionNotFound + } + + return cc, nil + +} diff --git a/internal/praefect/conn/client_connections_test.go b/internal/praefect/conn/client_connections_test.go new file mode 100644 index 000000000..9e2f832d1 --- /dev/null +++ b/internal/praefect/conn/client_connections_test.go @@ -0,0 +1,26 @@ +package conn + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRegisterNode(t *testing.T) { + storageName := "default" + tcpAddress := "address1" + clientConn := NewClientConnections() + + _, err := clientConn.GetConnection(storageName) + require.Equal(t, ErrConnectionNotFound, err) + + require.NoError(t, clientConn.RegisterNode(storageName, fmt.Sprintf("tcp://%s", tcpAddress))) + + conn, err := clientConn.GetConnection(storageName) + require.NoError(t, err) + require.Equal(t, tcpAddress, conn.Target()) + + err = clientConn.RegisterNode(storageName, "tcp://some-other-address") + require.Equal(t, ErrAlreadyRegistered, err) +} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 1c9ba2e1a..49a1d7990 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -9,9 +9,8 @@ import ( "sync" "syscall" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -19,7 +18,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" ) @@ -28,26 +26,25 @@ import ( // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { + connections *conn.ClientConnections log *logrus.Entry failoverMutex sync.RWMutex - connMutex sync.RWMutex datastore Datastore - nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: datastore, - nodes: make(map[string]*grpc.ClientConn), - registry: registry, + log: l, + datastore: datastore, + registry: registry, + connections: clientConnections, } } @@ -91,7 +88,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, err := c.GetConnection(storage) + cc, err := c.connections.GetConnection(storage) if err != nil { return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) } @@ -223,43 +220,6 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func() }, nil } -// RegisterNode will direct traffic to the supplied downstream connection when the storage location -// is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { - conn, err := client.Dial(listenAddr, - []grpc.DialOption{ - grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), - }, - ) - if err != nil { - return err - } - - c.setConn(storageName, conn) - - return nil -} - -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { - c.connMutex.Lock() - c.nodes[storageName] = conn - c.connMutex.Unlock() -} - -// GetConnection gets the grpc client connection based on an address -func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) { - c.connMutex.RLock() - cc, ok := c.nodes[storageName] - c.connMutex.RUnlock() - if !ok { - return nil, errors.New("client connection not found") - } - - return cc, nil - -} - // FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary func (c *Coordinator) FailoverRotation() { c.handleSignalAndRotate() diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index d6e3c519a..267ce9fbb 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1,6 +1,7 @@ package praefect import ( + "fmt" "io/ioutil" "testing" @@ -9,11 +10,11 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) var testLogger = logrus.New() @@ -48,7 +49,11 @@ func TestStreamDirector(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - coordinator := NewCoordinator(log.Default(), datastore) + address := "gitaly-primary.example.com" + clientConnections := conn.NewClientConnections() + clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address)) + + coordinator := NewCoordinator(log.Default(), datastore, clientConnections) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{ @@ -56,14 +61,9 @@ func TestStreamDirector(t *testing.T) { }) require.NoError(t, err) - cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure()) - require.NoError(t, err) - - coordinator.setConn("praefect-internal-1", cc) - _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame}) require.NoError(t, err) - require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use") + require.Equal(t, address, conn.Target()) jobs, err := datastore.GetJobs(JobStatePending, 1, 10) require.NoError(t, err) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 61cfdceed..b4c18083f 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -148,11 +149,11 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { - log *logrus.Entry - datastore Datastore - coordinator *Coordinator - targetNode string // which replica is this replicator responsible for? - replicator Replicator // does the actual replication logic + log *logrus.Entry + datastore Datastore + clientConnections *conn.ClientConnections + targetNode string // which replica is this replicator responsible for? + replicator Replicator // does the actual replication logic // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} @@ -163,14 +164,14 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ - log: log, - datastore: datastore, - whitelist: map[string]struct{}{}, - replicator: defaultReplicator{log}, - targetNode: targetNode, - coordinator: c, + log: log, + datastore: datastore, + whitelist: map[string]struct{}{}, + replicator: defaultReplicator{log}, + targetNode: targetNode, + clientConnections: c, } for _, opt := range opts { @@ -275,12 +276,12 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error { return err } - targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage) + targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) if err != nil { return err } - sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage) + sourceCC, err := r.clientConnections.GetConnection(job.Repository.Primary.Storage) if err != nil { return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b50db5194..22b7ed024 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -15,6 +15,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -96,15 +97,15 @@ func TestProceessReplicationJob(t *testing.T) { var replicator defaultReplicator replicator.log = gitaly_log.Default() - coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)} - coordinator.RegisterNode("default", srvSocketPath) - coordinator.RegisterNode("backup", srvSocketPath) + clientCC := conn.NewClientConnections() + clientCC.RegisterNode("default", srvSocketPath) + clientCC.RegisterNode("backup", srvSocketPath) replMgr := &ReplMgr{ - log: gitaly_log.Default(), - datastore: m, - coordinator: coordinator, - replicator: replicator, + log: gitaly_log.Default(), + datastore: m, + clientConnections: clientCC, + replicator: replicator, } require.NoError(t, replMgr.processReplJob(ctx, replJob)) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 3a7c0fabe..7e2307e62 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -12,7 +12,11 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" @@ -20,14 +24,15 @@ import ( // Server is a praefect server type Server struct { - coordinator *Coordinator - repl ReplMgr - s *grpc.Server + clientConnections *conn.ClientConnections + repl ReplMgr + s *grpc.Server + conf config.Config } // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server { +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( @@ -52,9 +57,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo }...) return &Server{ - s: grpc.NewServer(grpcOpts...), - coordinator: c, - repl: repl, + s: grpc.NewServer(grpcOpts...), + repl: repl, + clientConnections: clientConnections, + conf: conf, } } @@ -69,9 +75,17 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { // listener. Function will block until the server is stopped or an // unrecoverable error occurs. func (srv *Server) Start(lis net.Listener) error { + srv.registerServices() + return srv.s.Serve(lis) } +// registerServices will register any services praefect needs to handle rpcs on its own +func (srv *Server) registerServices() { + // ServerServiceServer is necessary for the ServerInfo RPC + gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections)) +} + // Shutdown will attempt a graceful shutdown of the grpc server. If unable // to gracefully shutdown within the context deadline, it will then // forcefully shutdown the server and return a context cancellation error. diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index cbc8e2482..6545f3cdf 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -12,10 +12,15 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/server/auth" + gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -57,7 +62,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { storagePrimary = "default" ) - datastore := NewMemoryDatastore(config.Config{ + conf := config.Config{ Nodes: []*models.Node{ &models.Node{ ID: 1, @@ -68,31 +73,38 @@ func TestServerSimpleUnaryUnary(t *testing.T) { ID: 2, Storage: "praefect-internal-2", }}, - }) + } + + datastore := NewMemoryDatastore(conf) logEntry := log.Default() - coordinator := NewCoordinator(logEntry, datastore, fd) + + clientCC := conn.NewClientConnections() for id, nodeStorage := range datastore.storageNodes.m { backend, cleanup := newMockDownstream(t, tt.callback) defer cleanup() // clean up mock downstream server resources - coordinator.RegisterNode(nodeStorage.Storage, backend) + clientCC.RegisterNode(nodeStorage.Storage, backend) nodeStorage.Address = backend datastore.storageNodes.m[id] = nodeStorage } + coordinator := NewCoordinator(logEntry, datastore, clientCC, fd) + replmgr := NewReplMgr( storagePrimary, logEntry, datastore, - coordinator, + clientCC, ) prf := NewServer( coordinator, replmgr, nil, logEntry, + clientCC, + conf, ) listener, port := listenAvailPort(t) @@ -126,6 +138,102 @@ func TestServerSimpleUnaryUnary(t *testing.T) { } } +func TestGitalyServerInfo(t *testing.T) { + conf := config.Config{ + Nodes: []*models.Node{ + &models.Node{ + ID: 1, + Storage: "praefect-internal-1", + DefaultPrimary: true, + }, + &models.Node{ + ID: 2, + Storage: "praefect-internal-2", + }}, + } + cc, srv := runFullPraefectServer(t, conf) + defer srv.s.Stop() + + client := gitalypb.NewServerServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + metadata, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) + require.NoError(t, err) + require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes)) + + for _, storageStatus := range metadata.GetStorageStatuses() { + require.NotNil(t, storageStatus, "none of the storage statuses should be nil") + } +} + +func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) { + datastore := NewMemoryDatastore(conf) + + logEntry := log.Default() + + clientCC := conn.NewClientConnections() + for id, nodeStorage := range datastore.storageNodes.m { + _, backend := runInternalGitalyServer(t) + + clientCC.RegisterNode(nodeStorage.Storage, backend) + nodeStorage.Address = backend + datastore.storageNodes.m[id] = nodeStorage + } + + coordinator := NewCoordinator(logEntry, datastore, clientCC, protoregistry.GitalyProtoFileDescriptors...) + + replmgr := NewReplMgr( + "", + logEntry, + datastore, + clientCC, + ) + + prf := NewServer( + coordinator, + replmgr, + nil, + logEntry, + clientCC, + conf, + ) + + listener, port := listenAvailPort(t) + t.Logf("proxy listening on port %d", port) + + errQ := make(chan error) + + go func() { + errQ <- prf.Start(listener) + }() + + // dial client to praefect + cc := dialLocalPort(t, port, false) + + return cc, prf +} + +func runInternalGitalyServer(t *testing.T) (*grpc.Server, string) { + streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor()} + unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor()} + + server := testhelper.NewTestGrpcServer(t, streamInt, unaryInt) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + if err != nil { + t.Fatal(err) + } + + gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) + + go server.Serve(listener) + + return server, "unix://" + serverSocketPath +} + func callbackIncrement(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { return &mock.SimpleResponse{ Value: req.Value + 1, diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go new file mode 100644 index 000000000..db01acf4f --- /dev/null +++ b/internal/praefect/service/server/info.go @@ -0,0 +1,51 @@ +package server + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "golang.org/x/sync/errgroup" + + "gitlab.com/gitlab-org/gitaly/internal/helper" +) + +// ServerInfo sends ServerInfoRequest to all of a praefect server's internal gitaly nodes and aggregates the results into +// a response +func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { + + storageStatuses := make([][]*gitalypb.ServerInfoResponse_StorageStatus, len(s.conf.Nodes)) + + g, ctx := errgroup.WithContext(ctx) + + for i, node := range s.conf.Nodes { + i := i // necessary since it will be used in a goroutine below + cc, err := s.clientCC.GetConnection(node.Storage) + if err != nil { + return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err) + } + g.Go(func() error { + client := gitalypb.NewServerServiceClient(cc) + resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) + if err != nil { + return fmt.Errorf("error when requesting server info from internal storage %v", node.Storage) + } + + storageStatuses[i] = resp.GetStorageStatuses() + + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, helper.ErrInternal(err) + } + + var response gitalypb.ServerInfoResponse + + for _, storageStatus := range storageStatuses { + response.StorageStatuses = append(response.StorageStatuses, storageStatus...) + } + + return &response, nil +} diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go new file mode 100644 index 000000000..30e04f529 --- /dev/null +++ b/internal/praefect/service/server/server.go @@ -0,0 +1,23 @@ +package server + +import ( + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +// Server is a ServerService server +type Server struct { + clientCC *conn.ClientConnections + conf config.Config +} + +// NewServer creates a new instance of a grpc ServerServiceServer +func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer { + s := &Server{ + clientCC: clientConnections, + conf: conf, + } + + return s +} |