Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-29 21:01:01 +0300
committerJohn Cai <jcai@gitlab.com>2020-02-04 21:09:17 +0300
commitc1f3e16c7fca2a7b776e968a01d79da9a378a167 (patch)
treed504e5533d11ccea19d4f09948e6d647f1b5235d
parent0ce9c84de2dd6c9618e9b66ad6ba68a815eb128a (diff)
Use node managerjc-use-node-manager
wire in the node manager
-rw-r--r--cmd/praefect/main.go27
-rw-r--r--internal/praefect/auth_test.go15
-rw-r--r--internal/praefect/coordinator.go155
-rw-r--r--internal/praefect/coordinator_test.go8
-rw-r--r--internal/praefect/datastore/datastore.go10
-rw-r--r--internal/praefect/datastore/datastore_test.go2
-rw-r--r--internal/praefect/helper_test.go117
-rw-r--r--internal/praefect/nodes/manager.go (renamed from internal/praefect/node_manager.go)30
-rw-r--r--internal/praefect/nodes/manager_test.go (renamed from internal/praefect/node_manager_test.go)10
-rw-r--r--internal/praefect/replicator.go25
-rw-r--r--internal/praefect/replicator_test.go18
-rw-r--r--internal/praefect/server.go26
-rw-r--r--internal/praefect/server_test.go12
-rw-r--r--internal/praefect/service/server/disk_stats.go37
-rw-r--r--internal/praefect/service/server/info.go44
-rw-r--r--internal/praefect/service/server/server.go12
16 files changed, 332 insertions, 216 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index a0868f36d..831f196d6 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -33,6 +33,9 @@ import (
"fmt"
"os"
"strings"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/bootstrap"
@@ -41,7 +44,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -128,20 +130,11 @@ func configure() (config.Config, error) {
}
func run(cfgs []starter.Config, conf config.Config) error {
- clientConnections := conn.NewClientConnections()
-
- for _, virtualStorage := range conf.VirtualStorages {
- for _, node := range virtualStorage.Nodes {
- if _, err := clientConnections.GetConnection(node.Storage); err == nil {
- continue
- }
- if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil {
- return fmt.Errorf("failed to register %s: %s", node.Address, err)
- }
-
- logger.WithField("node_address", node.Address).Info("registered gitaly node")
- }
+ nodeManager, err := nodes.NewManager(logger, conf.VirtualStorages)
+ if err != nil {
+ return err
}
+ nodeManager.Start(1*time.Second, 3*time.Second)
latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus)
if err != nil {
@@ -156,15 +149,15 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
ds = datastore.NewInMemory(conf)
- coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr(
"default",
logger,
ds,
- clientConnections,
+ nodeManager,
praefect.WithLatencyMetric(latencyMetric),
praefect.WithQueueMetric(queueMetric))
- srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
+ srv = praefect.NewServer(coordinator, repl, nil, logger, nodeManager, conf)
serverErrors = make(chan error, 1)
)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index f9130bfd1..ce0a21c34 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -168,13 +167,14 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
conf := config.Config{
Auth: auth.Config{Token: token, Transitioning: !required},
VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
+ {
Name: "praefect",
Nodes: []*models.Node{
- &models.Node{
+ {
Storage: "praefect-internal-0",
DefaultPrimary: true,
Address: backend,
+ Token: backendToken,
},
},
},
@@ -190,14 +190,13 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
logEntry := log.Default()
ds := datastore.NewInMemory(conf)
- clientConnections := conn.NewClientConnections()
- clientConnections.RegisterNode("praefect-internal-0", backend, backendToken)
+ nodeMgr := NewMockNodeManager(conf.VirtualStorages)
- coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd)
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, fd)
- replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections)
+ replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, nodeMgr)
- srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf)
+ srv := NewServer(coordinator, replMgr, nil, logEntry, nodeMgr, conf)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index f1b1d5492..67ab8f7b0 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -3,20 +3,24 @@ package praefect
import (
"context"
"errors"
- "fmt"
"os"
"os/signal"
"sync"
"syscall"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
+
+ "google.golang.org/grpc"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
@@ -31,7 +35,7 @@ func isDestructive(methodName string) bool {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- connections *conn.ClientConnections
+ nodeMgr nodes.Manager
log *logrus.Entry
failoverMutex sync.RWMutex
@@ -42,16 +46,16 @@ type Coordinator struct {
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manager, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
- log: l,
- datastore: ds,
- registry: registry,
- connections: clientConnections,
- conf: conf,
+ log: l,
+ datastore: ds,
+ registry: registry,
+ nodeMgr: nodeMgr,
+ conf: conf,
}
}
@@ -80,107 +84,118 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
var requestFinalizer func()
- var storage string
- if mi.Scope == protoregistry.ScopeRepository {
- var getRepoErr error
- storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
+ var conn *grpc.ClientConn
- if getRepoErr == protoregistry.ErrTargetRepoMissing {
- return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
+ if mi.Scope == protoregistry.ScopeRepository {
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ return nil, err
}
- if getRepoErr != nil {
- return nil, getRepoErr
+ shard, err := c.nodeMgr.GetShard(targetRepo.GetStorageName())
+ if err != nil {
+ return nil, err
}
- if storage == "" {
- return nil, status.Error(codes.InvalidArgument, "storage not found")
- }
- } else {
- storage, requestFinalizer, err = c.getAnyStorageNode()
+ primary, err := shard.GetPrimary()
+
if err != nil {
return nil, err
}
+
+ if err = c.rewriteStorageForRepositoryMessage(mi, m, peeker, primary.GetStorage()); err != nil {
+ if err == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, err.Error())
+ }
+
+ return nil, err
+ }
+
+ if mi.Operation == protoregistry.OpMutator {
+ change := datastore.UpdateRepo
+ if isDestructive(fullMethodName) {
+ change = datastore.DeleteRepo
+ }
+
+ secondaries, err := shard.GetSecondaries()
+ if err != nil {
+ return nil, err
+ }
+
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
+ return nil, err
+ }
+ }
+
+ return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil
}
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, err := c.connections.GetConnection(storage)
+
+ conn, err = c.getAnyStorageNode()
if err != nil {
- return nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ return nil, err
+ }
+
+ if requestFinalizer == nil {
+ requestFinalizer = noopRequestFinalizer
}
- return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil
+ return proxy.NewStreamParameters(ctx, conn, requestFinalizer, nil), nil
}
var noopRequestFinalizer = func() {}
-func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
+func (c *Coordinator) getAnyStorageNode() (*grpc.ClientConn, error) {
//TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
// proxy requests that are not repository scoped
- node, err := c.datastore.GetStorageNodes()
+ nodes, err := c.datastore.GetStorageNodes()
if err != nil {
- return "", nil, err
+ return nil, err
}
- if len(node) == 0 {
- return "", nil, errors.New("no node storages found")
+ if len(nodes) == 0 {
+ return nil, errors.New("no node storages found")
}
- return node[0].Storage, noopRequestFinalizer, nil
-}
+ node := nodes[0]
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, method string) (string, func(), error) {
- targetRepo, err := mi.TargetRepo(m)
- if err != nil {
- return "", nil, err
- }
+ conn, err := client.Dial(node.Address,
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
+ },
+ )
- primary, err := c.datastore.GetPrimary(targetRepo.GetStorageName())
- if err != nil {
- return "", nil, err
- }
+ return conn, nil
+}
- secondaries, err := c.datastore.GetSecondaries(targetRepo.GetStorageName())
+func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error {
+ targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return "", nil, err
+ return err
}
// rewrite storage name
- targetRepo.StorageName = primary.Storage
+ targetRepo.StorageName = primaryStorage
additionalRepo, ok, err := mi.AdditionalRepo(m)
if err != nil {
- return "", nil, err
+ return err
}
if ok {
- additionalRepo.StorageName = primary.Storage
+ additionalRepo.StorageName = primaryStorage
}
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return "", nil, err
+ return err
}
if err = peeker.Modify(b); err != nil {
- return "", nil, err
+ return err
}
- requestFinalizer := noopRequestFinalizer
-
- // TODO: move the logic of creating replication jobs to the streamDirector method
- if mi.Operation == protoregistry.OpMutator {
- change := datastore.UpdateRepo
- if isDestructive(method) {
- change = datastore.DeleteRepo
- }
-
- if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
- return "", nil, err
- }
- }
-
- return primary.Storage, requestFinalizer, nil
+ return nil
}
func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) {
@@ -197,8 +212,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary models.Node, secondaries []models.Node, change datastore.ChangeType) (func(), error) {
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary, secondaries, change)
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
+ var secondaryStorages []string
+ for _, secondary := range secondaries {
+ secondaryStorages = append(secondaryStorages, secondary.GetStorage())
+ }
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 4283779a1..000c00050 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1,7 +1,6 @@
package praefect
import (
- "fmt"
"io/ioutil"
"testing"
@@ -10,7 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -57,10 +55,10 @@ func TestStreamDirector(t *testing.T) {
defer cancel()
address := "gitaly-primary.example.com"
- clientConnections := conn.NewClientConnections()
- clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token")
- coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf)
+ nodeMgr := NewMockNodeManager(conf.VirtualStorages)
+
+ coordinator := NewCoordinator(log.Default(), ds, nodeMgr, conf)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index eb3cb0087..6d4de0be3 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -102,7 +102,7 @@ type ReplJobsDatastore interface {
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
@@ -292,7 +292,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -302,15 +302,15 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary mo
var jobIDs []uint64
- for _, secondary := range secondaries {
+ for _, secondaryStorage := range secondaryStorages {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.records[nextID] = jobRecord{
change: change,
- targetNodeStorage: secondary.Storage,
+ targetNodeStorage: secondaryStorage,
state: JobStatePending,
relativePath: relativePath,
- sourceNodeStorage: primary.Storage,
+ sourceNodeStorage: primaryStorage,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 6e908ffb1..de17dc690 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -42,7 +42,7 @@ var operations = []struct {
{
desc: "insert replication job",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1, []models.Node{stor2}, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
require.NoError(t, err)
},
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 6e8791afb..ba153a787 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -2,23 +2,28 @@ package praefect
import (
"context"
+ "errors"
"fmt"
"net"
"testing"
"time"
+ "google.golang.org/grpc/health"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/client"
internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
@@ -70,10 +75,10 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
-func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
+func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
var (
ds = datastore.NewInMemory(conf)
- coordinator = NewCoordinator(l, ds, clientCC, conf, fds...)
+ coordinator = NewCoordinator(l, ds, nodeMgr, conf, fds...)
)
var defaultNode *models.Node
@@ -88,14 +93,14 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
defaultNode.Storage,
l,
ds,
- clientCC,
+ nodeMgr,
)
server := NewServer(
coordinator,
replmgr,
nil,
l,
- clientCC,
+ nodeMgr,
conf,
)
@@ -108,7 +113,6 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
// configured storage node.
// requires there to be only 1 virtual storage
func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
- clientCC := conn.NewClientConnections()
require.Len(t, conf.VirtualStorages, 1)
require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes),
@@ -123,12 +127,15 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
conf.VirtualStorages[0].Nodes[i] = node
}
- _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
+ nodeMgr, err := nodes.NewManager(log.Default(), conf.VirtualStorages)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ _, prf := setupServer(t, conf, nodeMgr, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
@@ -163,14 +170,12 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
// requires exactly 1 virtual storage
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
require.Len(t, conf.VirtualStorages, 1)
- clientCC := conn.NewClientConnections()
var cleanups []testhelper.Cleanup
for i, node := range conf.VirtualStorages[0].Nodes {
_, backendAddr, cleanup := runInternalGitalyServer(t, node.Token)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
conf.VirtualStorages[0].Nodes[i] = node
}
@@ -178,13 +183,17 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
ds := datastore.NewInMemory(conf)
logEntry := log.Default()
- coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
+ nodeMgr, err := nodes.NewManager(log.Default(), conf.VirtualStorages)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, protoregistry.GitalyProtoFileDescriptors...)
replmgr := NewReplMgr(
"",
logEntry,
ds,
- clientCC,
+ nodeMgr,
WithQueueMetric(&promtest.MockGauge{}),
WithLatencyMetric(&promtest.MockHistogram{}),
)
@@ -193,7 +202,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
replmgr,
nil,
logEntry,
- clientCC,
+ nodeMgr,
conf,
)
@@ -243,6 +252,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string,
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer))
+ healthpb.RegisterHealthServer(server, health.NewServer())
errQ := make(chan error)
@@ -319,3 +329,84 @@ func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer)
return fmt.Sprintf("tcp://localhost:%d", port), cleanup
}
+
+type mockNodeMgr struct {
+ shards map[string]mockShard
+}
+
+type mockShard struct {
+ primary mockNode
+ secondaries []mockNode
+}
+
+func (m *mockShard) GetPrimary() (nodes.Node, error) {
+ return &m.primary, nil
+}
+
+func (m *mockShard) GetSecondaries() ([]nodes.Node, error) {
+ var nodes []nodes.Node
+ for _, n := range m.secondaries {
+ nodes = append(nodes, &n)
+ }
+
+ return nodes, nil
+}
+
+type mockNode struct {
+ storage, address, token string
+ conn *grpc.ClientConn
+}
+
+func (m *mockNode) GetStorage() string {
+ return m.storage
+}
+
+func (m *mockNode) GetAddress() string {
+ return m.address
+}
+
+func (m *mockNode) GetToken() string {
+ return m.token
+}
+
+func (m *mockNode) GetConnection() *grpc.ClientConn {
+ return m.conn
+}
+
+func (m *mockNodeMgr) GetShard(virtualStorageName string) (nodes.Shard, error) {
+ s, ok := m.shards[virtualStorageName]
+ if !ok {
+ return nil, errors.New("virtual storage not found")
+ }
+
+ return &s, nil
+}
+
+// NewMockNodeManager creates a nodes.Manager that doesn't check the health of its nodes
+func NewMockNodeManager(virtualStorages []*config.VirtualStorage) *mockNodeMgr {
+ m := &mockNodeMgr{shards: make(map[string]mockShard)}
+
+ for _, virtualStorage := range virtualStorages {
+ var s mockShard
+
+ for _, node := range virtualStorage.Nodes {
+ conn, _ := client.Dial(node.Address,
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
+ },
+ )
+ n := mockNode{storage: node.Storage, address: node.Address, token: node.Token, conn: conn}
+ if node.DefaultPrimary {
+ s.primary = n
+ continue
+ }
+ s.secondaries = append(s.secondaries, n)
+ }
+
+ m.shards[virtualStorage.Name] = s
+
+ }
+
+ return m
+}
diff --git a/internal/praefect/node_manager.go b/internal/praefect/nodes/manager.go
index 1a3d00b7d..00bb2a45e 100644
--- a/internal/praefect/node_manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -1,4 +1,4 @@
-package praefect
+package nodes
import (
"bytes"
@@ -23,8 +23,8 @@ type Shard interface {
GetSecondaries() ([]Node, error)
}
-// NodeManager is responsible for returning shards for virtual storages
-type NodeManager interface {
+// Manager is responsible for returning shards for virtual storages
+type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
}
@@ -63,8 +63,8 @@ func (s *shard) GetSecondaries() ([]Node, error) {
return secondaries, nil
}
-// NodeMgr is a concrete type that adheres to the NodeManager interface
-type NodeMgr struct {
+// Mgr is a concrete type that adheres to the Manager interface
+type Mgr struct {
shards map[string]*shard
log *logrus.Entry
}
@@ -73,8 +73,8 @@ type NodeMgr struct {
// should not be used for a new request
var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
-// NewNodeManager creates a new NodeMgr based on virtual storage configs
-func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage) (*NodeMgr, error) {
+// NewManager creates a new Mgr based on virtual storage configs
+func NewManager(log *logrus.Entry, virtualStorages []*config.VirtualStorage) (*Mgr, error) {
shards := make(map[string]*shard)
for _, virtualStorage := range virtualStorages {
@@ -105,7 +105,7 @@ func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage)
}
}
- return &NodeMgr{
+ return &Mgr{
shards: shards,
log: log,
}, nil
@@ -115,7 +115,7 @@ func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage)
// for deeming a node "healthy"
const healthcheckThreshold = 3
-func (n *NodeMgr) bootstrap(d time.Duration) error {
+func (n *Mgr) bootstrap(d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
@@ -130,7 +130,7 @@ func (n *NodeMgr) bootstrap(d time.Duration) error {
return nil
}
-func (n *NodeMgr) monitor(d time.Duration) {
+func (n *Mgr) monitor(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
@@ -143,14 +143,14 @@ func (n *NodeMgr) monitor(d time.Duration) {
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
-// the monitoring process. Start must be called before NodeMgr can be used.
-func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) {
+// the monitoring process. Start must be called before Mgr can be used.
+func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
n.bootstrap(bootstrapInterval)
go n.monitor(monitorInterval)
}
// GetShard retrieves a shard for a virtual storage name
-func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) {
+func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
shard, ok := n.shards[virtualStorageName]
if !ok {
return nil, errors.New("virtual storage does not exist")
@@ -175,7 +175,7 @@ func (e errCollection) Error() string {
return sb.String()
}
-func (n *NodeMgr) checkShards() error {
+func (n *Mgr) checkShards() error {
var errs errCollection
for _, shard := range n.shards {
if err := shard.primary.check(); err != nil {
@@ -271,7 +271,7 @@ func (n *nodeStatus) check() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
- resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: "TestService"})
+ resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
if err != nil {
resp = &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_UNKNOWN,
diff --git a/internal/praefect/node_manager_test.go b/internal/praefect/nodes/manager_test.go
index 76553327c..f9fa27a20 100644
--- a/internal/praefect/node_manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -1,4 +1,4 @@
-package praefect
+package nodes
import (
"net"
@@ -28,7 +28,7 @@ func TestNodeStatus(t *testing.T) {
}
require.True(t, cs.isHealthy())
- healthSvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
require.NoError(t, cs.check())
require.False(t, cs.isHealthy())
@@ -38,7 +38,7 @@ func TestNodeManager(t *testing.T) {
internalSocket0 := testhelper.GetTemporaryGitalySocketFileName()
internalSocket1 := testhelper.GetTemporaryGitalySocketFileName()
- virtualStorages := []config.VirtualStorage{
+ virtualStorages := []*config.VirtualStorage{
{
Name: "virtual-storage-0",
Nodes: []*models.Node{
@@ -61,7 +61,7 @@ func TestNodeManager(t *testing.T) {
_, _, cancel1 := newHealthServer(t, internalSocket1)
defer cancel1()
- nm, err := NewNodeManager(log.Default(), virtualStorages)
+ nm, err := NewManager(log.Default(), virtualStorages)
require.NoError(t, err)
_, err = nm.GetShard("virtual-storage-0")
@@ -82,7 +82,7 @@ func TestNodeManager(t *testing.T) {
require.Equal(t, virtualStorages[0].Nodes[1].Storage, secondaries[0].GetStorage())
require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress())
- srv0.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+ srv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
nm.checkShards()
// since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 5e5f0f559..2facb1579 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -5,10 +5,15 @@ import (
"fmt"
"time"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -148,7 +153,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
type ReplMgr struct {
log *logrus.Entry
datastore datastore.Datastore
- clientConnections *conn.ClientConnections
+ nodeManager nodes.Manager
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
replQueueMetric metrics.Gauge
@@ -177,14 +182,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
datastore: datastore,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
targetNode: targetNode,
- clientConnections: c,
+ nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}),
replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}),
}
@@ -278,6 +283,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
reset()
}
}
+func dialNode(addr, token string) (*grpc.ClientConn, error) {
+ return client.Dial(addr,
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)),
+ },
+ )
+}
// TODO: errors that occur during replication should be handled better. Logging
// is a crutch in this situation. Ideally, we need to update state somewhere
@@ -294,13 +307,13 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
return
}
- targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
+ targetCC, err := dialNode(job.TargetNode.Address, job.TargetNode.Token)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
return
}
- sourceCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
+ sourceCC, err := dialNode(job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
return
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 444579923..efa4cf494 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -9,13 +9,14 @@ import (
"testing"
"time"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -115,7 +116,11 @@ func TestProcessReplicationJob(t *testing.T) {
secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name)
require.NoError(t, err)
- _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, secondaries, datastore.UpdateRepo)
+ var secondaryStorages []string
+ for _, secondary := range secondaries {
+ secondaryStorages = append(secondaryStorages, secondary.Storage)
+ }
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
@@ -129,14 +134,14 @@ func TestProcessReplicationJob(t *testing.T) {
var replicator defaultReplicator
replicator.log = gitaly_log.Default()
- clientCC := conn.NewClientConnections()
- require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token))
- require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token))
+ nodeMgr, err := nodes.NewManager(gitaly_log.Default(), config.VirtualStorages)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
var mockReplicationGauge promtest.MockGauge
var mockReplicationHistogram promtest.MockHistogram
- replMgr := NewReplMgr("", gitaly_log.Default(), ds, clientCC, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge))
+ replMgr := NewReplMgr("", gitaly_log.Default(), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge))
replMgr.replicator = replicator
replMgr.processReplJob(ctx, jobs[0])
@@ -212,6 +217,7 @@ func TestBackoff(t *testing.T) {
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
server := serverPkg.NewInsecure(RubyServer)
+
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
listener, err := net.Listen("unix", serverSocketPath)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ceb1b8a76..863c30a3b 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -17,8 +17,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -31,11 +31,11 @@ import (
// Server is a praefect server
type Server struct {
- clientConnections *conn.ClientConnections
- repl ReplMgr
- s *grpc.Server
- conf config.Config
- l *logrus.Entry
+ nodeManager nodes.Manager
+ repl ReplMgr
+ s *grpc.Server
+ conf config.Config
+ l *logrus.Entry
}
func (srv *Server) warnDupeAddrs(c config.Config) {
@@ -60,7 +60,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server {
+func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, nodeManager nodes.Manager, conf config.Config) *Server {
ctxTagOpts := []grpc_ctxtags.Option{
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
@@ -98,11 +98,11 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}...)
s := &Server{
- s: grpc.NewServer(grpcOpts...),
- repl: repl,
- clientConnections: clientConnections,
- conf: conf,
- l: l,
+ s: grpc.NewServer(grpcOpts...),
+ repl: repl,
+ nodeManager: nodeManager,
+ conf: conf,
+ l: l,
}
s.warnDupeAddrs(conf)
@@ -125,7 +125,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
// RegisterServices will register any services praefect needs to handle rpcs on its own
func (srv *Server) RegisterServices() {
// ServerServiceServer is necessary for the ServerInfo RPC
- gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections))
+ gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.nodeManager))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index e201bcbcb..b35948f90 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -18,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -130,9 +129,9 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
},
}
- clientCC := conn.NewClientConnections()
- clientCC.RegisterNode(conf.VirtualStorages[0].Nodes[0].Storage, conf.VirtualStorages[0].Nodes[0].Address, conf.VirtualStorages[0].Nodes[0].Token)
- _, srv := setupServer(t, conf, clientCC, log.Default(), protoregistry.GitalyProtoFileDescriptors)
+ nodeMgr := NewMockNodeManager(conf.VirtualStorages)
+
+ _, srv := setupServer(t, conf, nodeMgr, log.Default(), protoregistry.GitalyProtoFileDescriptors)
listener, port := listenAvailPort(t)
go func() {
@@ -154,7 +153,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
func TestGitalyDiskStatistics(t *testing.T) {
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
+ {
Nodes: []*models.Node{
{
Storage: "praefect-internal-1",
@@ -168,6 +167,7 @@ func TestGitalyDiskStatistics(t *testing.T) {
},
},
}
+
cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
defer cleanup()
@@ -178,7 +178,7 @@ func TestGitalyDiskStatistics(t *testing.T) {
metadata, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{})
require.NoError(t, err)
- require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes))
+ require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes))
for _, storageStatus := range metadata.GetStorageStatuses() {
require.NotNil(t, storageStatus, "none of the storage statuses should be nil")
diff --git a/internal/praefect/service/server/disk_stats.go b/internal/praefect/service/server/disk_stats.go
index 745817826..700bdfb23 100644
--- a/internal/praefect/service/server/disk_stats.go
+++ b/internal/praefect/service/server/disk_stats.go
@@ -4,41 +4,38 @@ import (
"context"
"fmt"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "golang.org/x/sync/errgroup"
)
// DiskStatistics sends DiskStatisticsRequest to all of a praefect server's internal gitaly nodes and aggregates the
// results into a response
func (s *Server) DiskStatistics(ctx context.Context, _ *gitalypb.DiskStatisticsRequest) (*gitalypb.DiskStatisticsResponse, error) {
- storageStatuses := make([][]*gitalypb.DiskStatisticsResponse_StorageStatus, len(s.conf.Nodes))
+ var storageStatuses [][]*gitalypb.DiskStatisticsResponse_StorageStatus
- g, ctx := errgroup.WithContext(ctx)
+ for _, virtualStorage := range s.conf.VirtualStorages {
+ shard, err := s.nodeMgr.GetShard(virtualStorage.Name)
+ if err != nil {
+ return nil, err
+ }
- for i, node := range s.conf.Nodes {
- i := i // necessary since it will be used in a goroutine below
- node := node
- cc, err := s.clientCC.GetConnection(node.Storage)
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return nil, err
+ }
+ secondaries, err := shard.GetSecondaries()
if err != nil {
- return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err)
+ return nil, err
}
- g.Go(func() error {
- client := gitalypb.NewServerServiceClient(cc)
+ for _, node := range append(secondaries, primary) {
+ client := gitalypb.NewServerServiceClient(node.GetConnection())
resp, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{})
if err != nil {
- return fmt.Errorf("error when requesting disk statistics from internal storage %v", node.Storage)
+ return nil, fmt.Errorf("error when requesting disk statistics from internal storage %v", node.GetStorage())
}
- storageStatuses[i] = resp.GetStorageStatuses()
-
- return nil
- })
- }
-
- if err := g.Wait(); err != nil {
- return nil, helper.ErrInternal(err)
+ storageStatuses = append(storageStatuses, resp.GetStorageStatuses())
+ }
}
var response gitalypb.DiskStatisticsResponse
diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go
index f960a9ea5..64360f171 100644
--- a/internal/praefect/service/server/info.go
+++ b/internal/praefect/service/server/info.go
@@ -6,7 +6,7 @@ import (
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
)
@@ -15,20 +15,26 @@ import (
// a response
func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
var once sync.Once
- nodesChecked := make(map[string]struct{})
- var nodes []*models.Node
+ var nodes []nodes.Node
for _, virtualStorage := range s.conf.VirtualStorages {
- for _, node := range virtualStorage.Nodes {
- if _, ok := nodesChecked[node.Storage]; ok {
- continue
- }
+ shard, err := s.nodeMgr.GetShard(virtualStorage.Name)
+ if err != nil {
+ return nil, err
+ }
- nodesChecked[node.Storage] = struct{}{}
- nodes = append(nodes, node)
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return nil, err
}
- }
+ secondaries, err := shard.GetSecondaries()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes = append(append(nodes, primary), secondaries...)
+ }
var gitVersion, serverVersion string
g, ctx := errgroup.WithContext(ctx)
@@ -38,26 +44,20 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest)
for i, node := range nodes {
i := i
node := node
- cc, err := s.clientCC.GetConnection(node.Storage)
- if err != nil {
- grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting client connection")
- continue
- }
+
g.Go(func() error {
- client := gitalypb.NewServerServiceClient(cc)
+ client := gitalypb.NewServerServiceClient(node.GetConnection())
resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
if err != nil {
- grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting sever info")
+ grpc_logrus.Extract(ctx).WithField("storage", node.GetStorage()).WithError(err).Error("error getting sever info")
return nil
}
storageStatuses[i] = resp.GetStorageStatuses()
- if node.DefaultPrimary {
- once.Do(func() {
- gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
- })
- }
+ once.Do(func() {
+ gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
+ })
return nil
})
diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go
index 30e04f529..1788307ed 100644
--- a/internal/praefect/service/server/server.go
+++ b/internal/praefect/service/server/server.go
@@ -2,21 +2,21 @@ package server
import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
// Server is a ServerService server
type Server struct {
- clientCC *conn.ClientConnections
- conf config.Config
+ nodeMgr nodes.Manager
+ conf config.Config
}
// NewServer creates a new instance of a grpc ServerServiceServer
-func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer {
+func NewServer(conf config.Config, nodeMgr nodes.Manager) gitalypb.ServerServiceServer {
s := &Server{
- clientCC: clientConnections,
- conf: conf,
+ nodeMgr: nodeMgr,
+ conf: conf,
}
return s