diff options
author | John Cai <jcai@gitlab.com> | 2020-01-29 21:01:01 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-02-04 21:09:17 +0300 |
commit | c1f3e16c7fca2a7b776e968a01d79da9a378a167 (patch) | |
tree | d504e5533d11ccea19d4f09948e6d647f1b5235d | |
parent | 0ce9c84de2dd6c9618e9b66ad6ba68a815eb128a (diff) |
Use node managerjc-use-node-manager
wire in the node manager
-rw-r--r-- | cmd/praefect/main.go | 27 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 15 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 155 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 10 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 117 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go (renamed from internal/praefect/node_manager.go) | 30 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go (renamed from internal/praefect/node_manager_test.go) | 10 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 25 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/server.go | 26 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/service/server/disk_stats.go | 37 | ||||
-rw-r--r-- | internal/praefect/service/server/info.go | 44 | ||||
-rw-r--r-- | internal/praefect/service/server/server.go | 12 |
16 files changed, 332 insertions, 216 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a0868f36d..831f196d6 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -33,6 +33,9 @@ import ( "fmt" "os" "strings" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/bootstrap" @@ -41,7 +44,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -128,20 +130,11 @@ func configure() (config.Config, error) { } func run(cfgs []starter.Config, conf config.Config) error { - clientConnections := conn.NewClientConnections() - - for _, virtualStorage := range conf.VirtualStorages { - for _, node := range virtualStorage.Nodes { - if _, err := clientConnections.GetConnection(node.Storage); err == nil { - continue - } - if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil { - return fmt.Errorf("failed to register %s: %s", node.Address, err) - } - - logger.WithField("node_address", node.Address).Info("registered gitaly node") - } + nodeManager, err := nodes.NewManager(logger, conf.VirtualStorages) + if err != nil { + return err } + nodeManager.Start(1*time.Second, 3*time.Second) latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) if err != nil { @@ -156,15 +149,15 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr( "default", logger, ds, - clientConnections, + nodeManager, praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(coordinator, repl, nil, logger, nodeManager, conf) serverErrors = make(chan error, 1) ) diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f9130bfd1..ce0a21c34 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -168,13 +167,14 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func conf := config.Config{ Auth: auth.Config{Token: token, Transitioning: !required}, VirtualStorages: []*config.VirtualStorage{ - &config.VirtualStorage{ + { Name: "praefect", Nodes: []*models.Node{ - &models.Node{ + { Storage: "praefect-internal-0", DefaultPrimary: true, Address: backend, + Token: backendToken, }, }, }, @@ -190,14 +190,13 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func logEntry := log.Default() ds := datastore.NewInMemory(conf) - clientConnections := conn.NewClientConnections() - clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) + nodeMgr := NewMockNodeManager(conf.VirtualStorages) - coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd) + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, fd) - replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) + replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, nodeMgr) - srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) + srv := NewServer(coordinator, replMgr, nil, logEntry, nodeMgr, conf) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index f1b1d5492..67ab8f7b0 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -3,20 +3,24 @@ package praefect import ( "context" "errors" - "fmt" "os" "os/signal" "sync" "syscall" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + + "google.golang.org/grpc" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -31,7 +35,7 @@ func isDestructive(methodName string) bool { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - connections *conn.ClientConnections + nodeMgr nodes.Manager log *logrus.Entry failoverMutex sync.RWMutex @@ -42,16 +46,16 @@ type Coordinator struct { } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manager, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: ds, - registry: registry, - connections: clientConnections, - conf: conf, + log: l, + datastore: ds, + registry: registry, + nodeMgr: nodeMgr, + conf: conf, } } @@ -80,107 +84,118 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } var requestFinalizer func() - var storage string - if mi.Scope == protoregistry.ScopeRepository { - var getRepoErr error - storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + var conn *grpc.ClientConn - if getRepoErr == protoregistry.ErrTargetRepoMissing { - return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + if mi.Scope == protoregistry.ScopeRepository { + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return nil, err } - if getRepoErr != nil { - return nil, getRepoErr + shard, err := c.nodeMgr.GetShard(targetRepo.GetStorageName()) + if err != nil { + return nil, err } - if storage == "" { - return nil, status.Error(codes.InvalidArgument, "storage not found") - } - } else { - storage, requestFinalizer, err = c.getAnyStorageNode() + primary, err := shard.GetPrimary() + if err != nil { return nil, err } + + if err = c.rewriteStorageForRepositoryMessage(mi, m, peeker, primary.GetStorage()); err != nil { + if err == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + + return nil, err + } + + if mi.Operation == protoregistry.OpMutator { + change := datastore.UpdateRepo + if isDestructive(fullMethodName) { + change = datastore.DeleteRepo + } + + secondaries, err := shard.GetSecondaries() + if err != nil { + return nil, err + } + + if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { + return nil, err + } + } + + return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, err := c.connections.GetConnection(storage) + + conn, err = c.getAnyStorageNode() if err != nil { - return nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, err + } + + if requestFinalizer == nil { + requestFinalizer = noopRequestFinalizer } - return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil + return proxy.NewStreamParameters(ctx, conn, requestFinalizer, nil), nil } var noopRequestFinalizer = func() {} -func (c *Coordinator) getAnyStorageNode() (string, func(), error) { +func (c *Coordinator) getAnyStorageNode() (*grpc.ClientConn, error) { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to // proxy requests that are not repository scoped - node, err := c.datastore.GetStorageNodes() + nodes, err := c.datastore.GetStorageNodes() if err != nil { - return "", nil, err + return nil, err } - if len(node) == 0 { - return "", nil, errors.New("no node storages found") + if len(nodes) == 0 { + return nil, errors.New("no node storages found") } - return node[0].Storage, noopRequestFinalizer, nil -} + node := nodes[0] -func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, method string) (string, func(), error) { - targetRepo, err := mi.TargetRepo(m) - if err != nil { - return "", nil, err - } + conn, err := client.Dial(node.Address, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), + }, + ) - primary, err := c.datastore.GetPrimary(targetRepo.GetStorageName()) - if err != nil { - return "", nil, err - } + return conn, nil +} - secondaries, err := c.datastore.GetSecondaries(targetRepo.GetStorageName()) +func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error { + targetRepo, err := mi.TargetRepo(m) if err != nil { - return "", nil, err + return err } // rewrite storage name - targetRepo.StorageName = primary.Storage + targetRepo.StorageName = primaryStorage additionalRepo, ok, err := mi.AdditionalRepo(m) if err != nil { - return "", nil, err + return err } if ok { - additionalRepo.StorageName = primary.Storage + additionalRepo.StorageName = primaryStorage } b, err := proxy.Codec().Marshal(m) if err != nil { - return "", nil, err + return err } if err = peeker.Modify(b); err != nil { - return "", nil, err + return err } - requestFinalizer := noopRequestFinalizer - - // TODO: move the logic of creating replication jobs to the streamDirector method - if mi.Operation == protoregistry.OpMutator { - change := datastore.UpdateRepo - if isDestructive(method) { - change = datastore.DeleteRepo - } - - if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { - return "", nil, err - } - } - - return primary.Storage, requestFinalizer, nil + return nil } func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) { @@ -197,8 +212,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi return m, nil } -func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary models.Node, secondaries []models.Node, change datastore.ChangeType) (func(), error) { - jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary, secondaries, change) +func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { + var secondaryStorages []string + for _, secondary := range secondaries { + secondaryStorages = append(secondaryStorages, secondary.GetStorage()) + } + jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) if err != nil { return nil, err } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4283779a1..000c00050 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1,7 +1,6 @@ package praefect import ( - "fmt" "io/ioutil" "testing" @@ -10,7 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -57,10 +55,10 @@ func TestStreamDirector(t *testing.T) { defer cancel() address := "gitaly-primary.example.com" - clientConnections := conn.NewClientConnections() - clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf) + nodeMgr := NewMockNodeManager(conf.VirtualStorages) + + coordinator := NewCoordinator(log.Default(), ds, nodeMgr, conf) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index eb3cb0087..6d4de0be3 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -102,7 +102,7 @@ type ReplJobsDatastore interface { // CreateReplicaReplJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) + CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error @@ -292,7 +292,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi // CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() @@ -302,15 +302,15 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary mo var jobIDs []uint64 - for _, secondary := range secondaries { + for _, secondaryStorage := range secondaryStorages { nextID := uint64(len(md.jobs.records) + 1) md.jobs.records[nextID] = jobRecord{ change: change, - targetNodeStorage: secondary.Storage, + targetNodeStorage: secondaryStorage, state: JobStatePending, relativePath: relativePath, - sourceNodeStorage: primary.Storage, + sourceNodeStorage: primaryStorage, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 6e908ffb1..de17dc690 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -42,7 +42,7 @@ var operations = []struct { { desc: "insert replication job", opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1, []models.Node{stor2}, UpdateRepo) + _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) require.NoError(t, err) }, }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 6e8791afb..ba153a787 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -2,23 +2,28 @@ package praefect import ( "context" + "errors" "fmt" "net" "testing" "time" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server/auth" @@ -70,10 +75,10 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection -func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { +func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { var ( ds = datastore.NewInMemory(conf) - coordinator = NewCoordinator(l, ds, clientCC, conf, fds...) + coordinator = NewCoordinator(l, ds, nodeMgr, conf, fds...) ) var defaultNode *models.Node @@ -88,14 +93,14 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti defaultNode.Storage, l, ds, - clientCC, + nodeMgr, ) server := NewServer( coordinator, replmgr, nil, l, - clientCC, + nodeMgr, conf, ) @@ -108,7 +113,6 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti // configured storage node. // requires there to be only 1 virtual storage func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { - clientCC := conn.NewClientConnections() require.Len(t, conf.VirtualStorages, 1) require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes), @@ -123,12 +127,15 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st backendAddr, cleanup := newMockDownstream(t, node.Token, backend) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr conf.VirtualStorages[0].Nodes[i] = node } - _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) + nodeMgr, err := nodes.NewManager(log.Default(), conf.VirtualStorages) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + _, prf := setupServer(t, conf, nodeMgr, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) @@ -163,14 +170,12 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st // requires exactly 1 virtual storage func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { require.Len(t, conf.VirtualStorages, 1) - clientCC := conn.NewClientConnections() var cleanups []testhelper.Cleanup for i, node := range conf.VirtualStorages[0].Nodes { _, backendAddr, cleanup := runInternalGitalyServer(t, node.Token) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr conf.VirtualStorages[0].Nodes[i] = node } @@ -178,13 +183,17 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ds := datastore.NewInMemory(conf) logEntry := log.Default() - coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + nodeMgr, err := nodes.NewManager(log.Default(), conf.VirtualStorages) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( "", logEntry, ds, - clientCC, + nodeMgr, WithQueueMetric(&promtest.MockGauge{}), WithLatencyMetric(&promtest.MockHistogram{}), ) @@ -193,7 +202,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client replmgr, nil, logEntry, - clientCC, + nodeMgr, conf, ) @@ -243,6 +252,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer)) + healthpb.RegisterHealthServer(server, health.NewServer()) errQ := make(chan error) @@ -319,3 +329,84 @@ func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) return fmt.Sprintf("tcp://localhost:%d", port), cleanup } + +type mockNodeMgr struct { + shards map[string]mockShard +} + +type mockShard struct { + primary mockNode + secondaries []mockNode +} + +func (m *mockShard) GetPrimary() (nodes.Node, error) { + return &m.primary, nil +} + +func (m *mockShard) GetSecondaries() ([]nodes.Node, error) { + var nodes []nodes.Node + for _, n := range m.secondaries { + nodes = append(nodes, &n) + } + + return nodes, nil +} + +type mockNode struct { + storage, address, token string + conn *grpc.ClientConn +} + +func (m *mockNode) GetStorage() string { + return m.storage +} + +func (m *mockNode) GetAddress() string { + return m.address +} + +func (m *mockNode) GetToken() string { + return m.token +} + +func (m *mockNode) GetConnection() *grpc.ClientConn { + return m.conn +} + +func (m *mockNodeMgr) GetShard(virtualStorageName string) (nodes.Shard, error) { + s, ok := m.shards[virtualStorageName] + if !ok { + return nil, errors.New("virtual storage not found") + } + + return &s, nil +} + +// NewMockNodeManager creates a nodes.Manager that doesn't check the health of its nodes +func NewMockNodeManager(virtualStorages []*config.VirtualStorage) *mockNodeMgr { + m := &mockNodeMgr{shards: make(map[string]mockShard)} + + for _, virtualStorage := range virtualStorages { + var s mockShard + + for _, node := range virtualStorage.Nodes { + conn, _ := client.Dial(node.Address, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), + }, + ) + n := mockNode{storage: node.Storage, address: node.Address, token: node.Token, conn: conn} + if node.DefaultPrimary { + s.primary = n + continue + } + s.secondaries = append(s.secondaries, n) + } + + m.shards[virtualStorage.Name] = s + + } + + return m +} diff --git a/internal/praefect/node_manager.go b/internal/praefect/nodes/manager.go index 1a3d00b7d..00bb2a45e 100644 --- a/internal/praefect/node_manager.go +++ b/internal/praefect/nodes/manager.go @@ -1,4 +1,4 @@ -package praefect +package nodes import ( "bytes" @@ -23,8 +23,8 @@ type Shard interface { GetSecondaries() ([]Node, error) } -// NodeManager is responsible for returning shards for virtual storages -type NodeManager interface { +// Manager is responsible for returning shards for virtual storages +type Manager interface { GetShard(virtualStorageName string) (Shard, error) } @@ -63,8 +63,8 @@ func (s *shard) GetSecondaries() ([]Node, error) { return secondaries, nil } -// NodeMgr is a concrete type that adheres to the NodeManager interface -type NodeMgr struct { +// Mgr is a concrete type that adheres to the Manager interface +type Mgr struct { shards map[string]*shard log *logrus.Entry } @@ -73,8 +73,8 @@ type NodeMgr struct { // should not be used for a new request var ErrPrimaryNotHealthy = errors.New("primary is not healthy") -// NewNodeManager creates a new NodeMgr based on virtual storage configs -func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage) (*NodeMgr, error) { +// NewManager creates a new Mgr based on virtual storage configs +func NewManager(log *logrus.Entry, virtualStorages []*config.VirtualStorage) (*Mgr, error) { shards := make(map[string]*shard) for _, virtualStorage := range virtualStorages { @@ -105,7 +105,7 @@ func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage) } } - return &NodeMgr{ + return &Mgr{ shards: shards, log: log, }, nil @@ -115,7 +115,7 @@ func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage) // for deeming a node "healthy" const healthcheckThreshold = 3 -func (n *NodeMgr) bootstrap(d time.Duration) error { +func (n *Mgr) bootstrap(d time.Duration) error { timer := time.NewTimer(d) defer timer.Stop() @@ -130,7 +130,7 @@ func (n *NodeMgr) bootstrap(d time.Duration) error { return nil } -func (n *NodeMgr) monitor(d time.Duration) { +func (n *Mgr) monitor(d time.Duration) { ticker := time.NewTicker(d) defer ticker.Stop() @@ -143,14 +143,14 @@ func (n *NodeMgr) monitor(d time.Duration) { } // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off -// the monitoring process. Start must be called before NodeMgr can be used. -func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) { +// the monitoring process. Start must be called before Mgr can be used. +func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { n.bootstrap(bootstrapInterval) go n.monitor(monitorInterval) } // GetShard retrieves a shard for a virtual storage name -func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) { +func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { shard, ok := n.shards[virtualStorageName] if !ok { return nil, errors.New("virtual storage does not exist") @@ -175,7 +175,7 @@ func (e errCollection) Error() string { return sb.String() } -func (n *NodeMgr) checkShards() error { +func (n *Mgr) checkShards() error { var errs errCollection for _, shard := range n.shards { if err := shard.primary.check(); err != nil { @@ -271,7 +271,7 @@ func (n *nodeStatus) check() error { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: "TestService"}) + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) if err != nil { resp = &healthpb.HealthCheckResponse{ Status: healthpb.HealthCheckResponse_UNKNOWN, diff --git a/internal/praefect/node_manager_test.go b/internal/praefect/nodes/manager_test.go index 76553327c..f9fa27a20 100644 --- a/internal/praefect/node_manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -1,4 +1,4 @@ -package praefect +package nodes import ( "net" @@ -28,7 +28,7 @@ func TestNodeStatus(t *testing.T) { } require.True(t, cs.isHealthy()) - healthSvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) require.NoError(t, cs.check()) require.False(t, cs.isHealthy()) @@ -38,7 +38,7 @@ func TestNodeManager(t *testing.T) { internalSocket0 := testhelper.GetTemporaryGitalySocketFileName() internalSocket1 := testhelper.GetTemporaryGitalySocketFileName() - virtualStorages := []config.VirtualStorage{ + virtualStorages := []*config.VirtualStorage{ { Name: "virtual-storage-0", Nodes: []*models.Node{ @@ -61,7 +61,7 @@ func TestNodeManager(t *testing.T) { _, _, cancel1 := newHealthServer(t, internalSocket1) defer cancel1() - nm, err := NewNodeManager(log.Default(), virtualStorages) + nm, err := NewManager(log.Default(), virtualStorages) require.NoError(t, err) _, err = nm.GetShard("virtual-storage-0") @@ -82,7 +82,7 @@ func TestNodeManager(t *testing.T) { require.Equal(t, virtualStorages[0].Nodes[1].Storage, secondaries[0].GetStorage()) require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress()) - srv0.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_UNKNOWN) + srv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) nm.checkShards() // since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 5e5f0f559..2facb1579 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -5,10 +5,15 @@ import ( "fmt" "time" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -148,7 +153,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, type ReplMgr struct { log *logrus.Entry datastore datastore.Datastore - clientConnections *conn.ClientConnections + nodeManager nodes.Manager targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic replQueueMetric metrics.Gauge @@ -177,14 +182,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, datastore: datastore, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, targetNode: targetNode, - clientConnections: c, + nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } @@ -278,6 +283,14 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { reset() } } +func dialNode(addr, token string) (*grpc.ClientConn, error) { + return client.Dial(addr, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)), + }, + ) +} // TODO: errors that occur during replication should be handled better. Logging // is a crutch in this situation. Ideally, we need to update state somewhere @@ -294,13 +307,13 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) { return } - targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) + targetCC, err := dialNode(job.TargetNode.Address, job.TargetNode.Token) if err != nil { l.WithError(err).Error("unable to obtain client connection for secondary node in replication job") return } - sourceCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) + sourceCC, err := dialNode(job.SourceNode.Address, job.SourceNode.Token) if err != nil { l.WithError(err).Error("unable to obtain client connection for primary node in replication job") return diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 444579923..efa4cf494 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -9,13 +9,14 @@ import ( "testing" "time" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -115,7 +116,11 @@ func TestProcessReplicationJob(t *testing.T) { secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name) require.NoError(t, err) - _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, secondaries, datastore.UpdateRepo) + var secondaryStorages []string + for _, secondary := range secondaries { + secondaryStorages = append(secondaryStorages, secondary.Storage) + } + _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) require.NoError(t, err) jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) @@ -129,14 +134,14 @@ func TestProcessReplicationJob(t *testing.T) { var replicator defaultReplicator replicator.log = gitaly_log.Default() - clientCC := conn.NewClientConnections() - require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token)) - require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token)) + nodeMgr, err := nodes.NewManager(gitaly_log.Default(), config.VirtualStorages) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) var mockReplicationGauge promtest.MockGauge var mockReplicationHistogram promtest.MockHistogram - replMgr := NewReplMgr("", gitaly_log.Default(), ds, clientCC, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) + replMgr := NewReplMgr("", gitaly_log.Default(), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) replMgr.replicator = replicator replMgr.processReplJob(ctx, jobs[0]) @@ -212,6 +217,7 @@ func TestBackoff(t *testing.T) { func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { server := serverPkg.NewInsecure(RubyServer) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() listener, err := net.Listen("unix", serverSocketPath) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ceb1b8a76..863c30a3b 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -17,8 +17,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" "gitlab.com/gitlab-org/gitaly/internal/server/auth" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -31,11 +31,11 @@ import ( // Server is a praefect server type Server struct { - clientConnections *conn.ClientConnections - repl ReplMgr - s *grpc.Server - conf config.Config - l *logrus.Entry + nodeManager nodes.Manager + repl ReplMgr + s *grpc.Server + conf config.Config + l *logrus.Entry } func (srv *Server) warnDupeAddrs(c config.Config) { @@ -60,7 +60,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, nodeManager nodes.Manager, conf config.Config) *Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -98,11 +98,11 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo }...) s := &Server{ - s: grpc.NewServer(grpcOpts...), - repl: repl, - clientConnections: clientConnections, - conf: conf, - l: l, + s: grpc.NewServer(grpcOpts...), + repl: repl, + nodeManager: nodeManager, + conf: conf, + l: l, } s.warnDupeAddrs(conf) @@ -125,7 +125,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error { // RegisterServices will register any services praefect needs to handle rpcs on its own func (srv *Server) RegisterServices() { // ServerServiceServer is necessary for the ServerInfo RPC - gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections)) + gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.nodeManager)) healthpb.RegisterHealthServer(srv.s, health.NewServer()) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index e201bcbcb..b35948f90 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -18,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -130,9 +129,9 @@ func TestGitalyServerInfoBadNode(t *testing.T) { }, } - clientCC := conn.NewClientConnections() - clientCC.RegisterNode(conf.VirtualStorages[0].Nodes[0].Storage, conf.VirtualStorages[0].Nodes[0].Address, conf.VirtualStorages[0].Nodes[0].Token) - _, srv := setupServer(t, conf, clientCC, log.Default(), protoregistry.GitalyProtoFileDescriptors) + nodeMgr := NewMockNodeManager(conf.VirtualStorages) + + _, srv := setupServer(t, conf, nodeMgr, log.Default(), protoregistry.GitalyProtoFileDescriptors) listener, port := listenAvailPort(t) go func() { @@ -154,7 +153,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { func TestGitalyDiskStatistics(t *testing.T) { conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ - &config.VirtualStorage{ + { Nodes: []*models.Node{ { Storage: "praefect-internal-1", @@ -168,6 +167,7 @@ func TestGitalyDiskStatistics(t *testing.T) { }, }, } + cc, _, cleanup := runPraefectServerWithGitaly(t, conf) defer cleanup() @@ -178,7 +178,7 @@ func TestGitalyDiskStatistics(t *testing.T) { metadata, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) require.NoError(t, err) - require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes)) + require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes)) for _, storageStatus := range metadata.GetStorageStatuses() { require.NotNil(t, storageStatus, "none of the storage statuses should be nil") diff --git a/internal/praefect/service/server/disk_stats.go b/internal/praefect/service/server/disk_stats.go index 745817826..700bdfb23 100644 --- a/internal/praefect/service/server/disk_stats.go +++ b/internal/praefect/service/server/disk_stats.go @@ -4,41 +4,38 @@ import ( "context" "fmt" - "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "golang.org/x/sync/errgroup" ) // DiskStatistics sends DiskStatisticsRequest to all of a praefect server's internal gitaly nodes and aggregates the // results into a response func (s *Server) DiskStatistics(ctx context.Context, _ *gitalypb.DiskStatisticsRequest) (*gitalypb.DiskStatisticsResponse, error) { - storageStatuses := make([][]*gitalypb.DiskStatisticsResponse_StorageStatus, len(s.conf.Nodes)) + var storageStatuses [][]*gitalypb.DiskStatisticsResponse_StorageStatus - g, ctx := errgroup.WithContext(ctx) + for _, virtualStorage := range s.conf.VirtualStorages { + shard, err := s.nodeMgr.GetShard(virtualStorage.Name) + if err != nil { + return nil, err + } - for i, node := range s.conf.Nodes { - i := i // necessary since it will be used in a goroutine below - node := node - cc, err := s.clientCC.GetConnection(node.Storage) + primary, err := shard.GetPrimary() + if err != nil { + return nil, err + } + secondaries, err := shard.GetSecondaries() if err != nil { - return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err) + return nil, err } - g.Go(func() error { - client := gitalypb.NewServerServiceClient(cc) + for _, node := range append(secondaries, primary) { + client := gitalypb.NewServerServiceClient(node.GetConnection()) resp, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) if err != nil { - return fmt.Errorf("error when requesting disk statistics from internal storage %v", node.Storage) + return nil, fmt.Errorf("error when requesting disk statistics from internal storage %v", node.GetStorage()) } - storageStatuses[i] = resp.GetStorageStatuses() - - return nil - }) - } - - if err := g.Wait(); err != nil { - return nil, helper.ErrInternal(err) + storageStatuses = append(storageStatuses, resp.GetStorageStatuses()) + } } var response gitalypb.DiskStatisticsResponse diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go index f960a9ea5..64360f171 100644 --- a/internal/praefect/service/server/info.go +++ b/internal/praefect/service/server/info.go @@ -6,7 +6,7 @@ import ( grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "golang.org/x/sync/errgroup" ) @@ -15,20 +15,26 @@ import ( // a response func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { var once sync.Once - nodesChecked := make(map[string]struct{}) - var nodes []*models.Node + var nodes []nodes.Node for _, virtualStorage := range s.conf.VirtualStorages { - for _, node := range virtualStorage.Nodes { - if _, ok := nodesChecked[node.Storage]; ok { - continue - } + shard, err := s.nodeMgr.GetShard(virtualStorage.Name) + if err != nil { + return nil, err + } - nodesChecked[node.Storage] = struct{}{} - nodes = append(nodes, node) + primary, err := shard.GetPrimary() + if err != nil { + return nil, err } - } + secondaries, err := shard.GetSecondaries() + if err != nil { + return nil, err + } + + nodes = append(append(nodes, primary), secondaries...) + } var gitVersion, serverVersion string g, ctx := errgroup.WithContext(ctx) @@ -38,26 +44,20 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) for i, node := range nodes { i := i node := node - cc, err := s.clientCC.GetConnection(node.Storage) - if err != nil { - grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting client connection") - continue - } + g.Go(func() error { - client := gitalypb.NewServerServiceClient(cc) + client := gitalypb.NewServerServiceClient(node.GetConnection()) resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) if err != nil { - grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting sever info") + grpc_logrus.Extract(ctx).WithField("storage", node.GetStorage()).WithError(err).Error("error getting sever info") return nil } storageStatuses[i] = resp.GetStorageStatuses() - if node.DefaultPrimary { - once.Do(func() { - gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() - }) - } + once.Do(func() { + gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() + }) return nil }) diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go index 30e04f529..1788307ed 100644 --- a/internal/praefect/service/server/server.go +++ b/internal/praefect/service/server/server.go @@ -2,21 +2,21 @@ package server import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) // Server is a ServerService server type Server struct { - clientCC *conn.ClientConnections - conf config.Config + nodeMgr nodes.Manager + conf config.Config } // NewServer creates a new instance of a grpc ServerServiceServer -func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer { +func NewServer(conf config.Config, nodeMgr nodes.Manager) gitalypb.ServerServiceServer { s := &Server{ - clientCC: clientConnections, - conf: conf, + nodeMgr: nodeMgr, + conf: conf, } return s |