diff options
author | John Cai <jcai@gitlab.com> | 2020-01-29 18:56:46 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-29 18:56:46 +0300 |
commit | be6439f5fad2baaa59cf7c17645b022bc35458e4 (patch) | |
tree | d72ebf851dae36b81a772db81f46fb7216f2d010 | |
parent | 2559193a878dc50ce2485dc03e321518936254a5 (diff) |
Add Node manager
Node manager is a new component that will simplify praefect routing. It
takes care of pinging the nodes and promoting a secondary to a primary
and demoting a primary to a secondary based on service health from
sending healthcheck requests.
-rw-r--r-- | changelogs/unreleased/jc-node-manager.yml | 5 | ||||
-rw-r--r-- | internal/praefect/node_manager.go | 287 | ||||
-rw-r--r-- | internal/praefect/node_manager_test.go | 134 |
3 files changed, 426 insertions, 0 deletions
diff --git a/changelogs/unreleased/jc-node-manager.yml b/changelogs/unreleased/jc-node-manager.yml new file mode 100644 index 000000000..c4ae604dc --- /dev/null +++ b/changelogs/unreleased/jc-node-manager.yml @@ -0,0 +1,5 @@ +--- +title: Add Node manager +merge_request: 1779 +author: +type: added diff --git a/internal/praefect/node_manager.go b/internal/praefect/node_manager.go new file mode 100644 index 000000000..1a3d00b7d --- /dev/null +++ b/internal/praefect/node_manager.go @@ -0,0 +1,287 @@ +package praefect + +import ( + "bytes" + "context" + "errors" + "sync" + "time" + + "github.com/sirupsen/logrus" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// Shard is a primary with a set of secondaries +type Shard interface { + GetPrimary() (Node, error) + GetSecondaries() ([]Node, error) +} + +// NodeManager is responsible for returning shards for virtual storages +type NodeManager interface { + GetShard(virtualStorageName string) (Shard, error) +} + +// Node represents some metadata of a node as well as a connection +type Node interface { + GetStorage() string + GetAddress() string + GetToken() string + GetConnection() *grpc.ClientConn +} + +type shard struct { + m sync.RWMutex + primary *nodeStatus + secondaries []*nodeStatus +} + +// GetPrimary gets the primary of a shard +func (s *shard) GetPrimary() (Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + return s.primary, nil +} + +// GetSecondaries gets the secondaries of a shard +func (s *shard) GetSecondaries() ([]Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + var secondaries []Node + for _, secondary := range s.secondaries { + secondaries = append(secondaries, secondary) + } + + return secondaries, nil +} + +// NodeMgr is a concrete type that adheres to the NodeManager interface +type NodeMgr struct { + shards map[string]*shard + log *logrus.Entry +} + +// ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence +// should not be used for a new request +var ErrPrimaryNotHealthy = errors.New("primary is not healthy") + +// NewNodeManager creates a new NodeMgr based on virtual storage configs +func NewNodeManager(log *logrus.Entry, virtualStorages []config.VirtualStorage) (*NodeMgr, error) { + shards := make(map[string]*shard) + + for _, virtualStorage := range virtualStorages { + var secondaries []*nodeStatus + var primary *nodeStatus + for _, node := range virtualStorage.Nodes { + conn, err := client.Dial(node.Address, + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), + }, + ) + if err != nil { + return nil, err + } + ns := newConnectionStatus(*node, conn) + + if node.DefaultPrimary { + primary = ns + continue + } + + secondaries = append(secondaries, ns) + } + shards[virtualStorage.Name] = &shard{ + primary: primary, + secondaries: secondaries, + } + } + + return &NodeMgr{ + shards: shards, + log: log, + }, nil +} + +// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary +// for deeming a node "healthy" +const healthcheckThreshold = 3 + +func (n *NodeMgr) bootstrap(d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + for i := 0; i < healthcheckThreshold; i++ { + <-timer.C + if err := n.checkShards(); err != nil { + return err + } + timer.Reset(d) + } + + return nil +} + +func (n *NodeMgr) monitor(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + <-ticker.C + if err := n.checkShards(); err != nil { + n.log.WithError(err).Error("error when checking shards") + } + } +} + +// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off +// the monitoring process. Start must be called before NodeMgr can be used. +func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) { + n.bootstrap(bootstrapInterval) + go n.monitor(monitorInterval) +} + +// GetShard retrieves a shard for a virtual storage name +func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) { + shard, ok := n.shards[virtualStorageName] + if !ok { + return nil, errors.New("virtual storage does not exist") + } + + if !shard.primary.isHealthy() { + return nil, ErrPrimaryNotHealthy + } + + return shard, nil +} + +type errCollection []error + +func (e errCollection) Error() string { + sb := bytes.NewBufferString("") + for _, err := range e { + sb.WriteString(err.Error()) + sb.WriteString("\n") + } + + return sb.String() +} + +func (n *NodeMgr) checkShards() error { + var errs errCollection + for _, shard := range n.shards { + if err := shard.primary.check(); err != nil { + errs = append(errs, err) + } + for _, secondary := range shard.secondaries { + if err := secondary.check(); err != nil { + errs = append(errs, err) + } + } + + if shard.primary.isHealthy() { + continue + } + + newPrimaryIndex := -1 + for i, secondary := range shard.secondaries { + if secondary.isHealthy() { + newPrimaryIndex = i + break + } + } + + if newPrimaryIndex < 0 { + // no healthy secondaries exist + continue + } + shard.m.Lock() + newPrimary := shard.secondaries[newPrimaryIndex] + shard.secondaries = append(shard.secondaries[:newPrimaryIndex], shard.secondaries[newPrimaryIndex+1:]...) + shard.secondaries = append(shard.secondaries, shard.primary) + shard.primary = newPrimary + shard.m.Unlock() + } + + if len(errs) > 0 { + return errs + } + + return nil +} + +func newConnectionStatus(node models.Node, cc *grpc.ClientConn) *nodeStatus { + return &nodeStatus{ + Node: node, + ClientConn: cc, + statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0), + } +} + +type nodeStatus struct { + models.Node + *grpc.ClientConn + statuses []healthpb.HealthCheckResponse_ServingStatus +} + +// GetStorage gets the storage name of a node +func (n *nodeStatus) GetStorage() string { + return n.Storage +} + +// GetAddress gets the address of a node +func (n *nodeStatus) GetAddress() string { + return n.Address +} + +// GetToken gets the token of a node +func (n *nodeStatus) GetToken() string { + return n.Token +} + +// GetConnection gets the client connection of a node +func (n *nodeStatus) GetConnection() *grpc.ClientConn { + return n.ClientConn +} + +func (n *nodeStatus) isHealthy() bool { + if len(n.statuses) < healthcheckThreshold { + return false + } + + for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] { + if status != healthpb.HealthCheckResponse_SERVING { + return false + } + } + + return true +} + +func (n *nodeStatus) check() error { + client := healthpb.NewHealthClient(n.ClientConn) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: "TestService"}) + if err != nil { + resp = &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_UNKNOWN, + } + } + + n.statuses = append(n.statuses, resp.Status) + if len(n.statuses) > healthcheckThreshold { + n.statuses = n.statuses[1:] + } + + return err +} diff --git a/internal/praefect/node_manager_test.go b/internal/praefect/node_manager_test.go new file mode 100644 index 000000000..76553327c --- /dev/null +++ b/internal/praefect/node_manager_test.go @@ -0,0 +1,134 @@ +package praefect + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestNodeStatus(t *testing.T) { + cc, healthSvr, cleanup := newHealthServer(t, testhelper.GetTemporaryGitalySocketFileName()) + defer cleanup() + + cs := newConnectionStatus(models.Node{}, cc) + + require.False(t, cs.isHealthy()) + + for i := 0; i < healthcheckThreshold; i++ { + require.NoError(t, cs.check()) + } + require.True(t, cs.isHealthy()) + + healthSvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + + require.NoError(t, cs.check()) + require.False(t, cs.isHealthy()) +} + +func TestNodeManager(t *testing.T) { + internalSocket0 := testhelper.GetTemporaryGitalySocketFileName() + internalSocket1 := testhelper.GetTemporaryGitalySocketFileName() + + virtualStorages := []config.VirtualStorage{ + { + Name: "virtual-storage-0", + Nodes: []*models.Node{ + { + Storage: "praefect-internal-0", + Address: "unix://" + internalSocket0, + DefaultPrimary: true, + }, + { + Storage: "praefect-internal-1", + Address: "unix://" + internalSocket1, + }, + }, + }, + } + + _, srv0, cancel0 := newHealthServer(t, internalSocket0) + defer cancel0() + + _, _, cancel1 := newHealthServer(t, internalSocket1) + defer cancel1() + + nm, err := NewNodeManager(log.Default(), virtualStorages) + require.NoError(t, err) + + _, err = nm.GetShard("virtual-storage-0") + require.Error(t, ErrPrimaryNotHealthy, err) + + nm.Start(1*time.Millisecond, 5*time.Second) + + shard, err := nm.GetShard("virtual-storage-0") + require.NoError(t, err) + primary, err := shard.GetPrimary() + require.NoError(t, err) + secondaries, err := shard.GetSecondaries() + require.NoError(t, err) + + require.Equal(t, virtualStorages[0].Nodes[0].Storage, primary.GetStorage()) + require.Equal(t, virtualStorages[0].Nodes[0].Address, primary.GetAddress()) + require.Len(t, secondaries, 1) + require.Equal(t, virtualStorages[0].Nodes[1].Storage, secondaries[0].GetStorage()) + require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress()) + + srv0.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_UNKNOWN) + nm.checkShards() + + // since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy + // secondary to primary + + shard, err = nm.GetShard("virtual-storage-0") + require.NoError(t, err) + primary, err = shard.GetPrimary() + require.NoError(t, err) + secondaries, err = shard.GetSecondaries() + require.NoError(t, err) + + require.Equal(t, virtualStorages[0].Nodes[1].Storage, primary.GetStorage()) + require.Equal(t, virtualStorages[0].Nodes[1].Address, primary.GetAddress()) + require.Len(t, secondaries, 1) + require.Equal(t, virtualStorages[0].Nodes[0].Storage, secondaries[0].GetStorage()) + require.Equal(t, virtualStorages[0].Nodes[0].Address, secondaries[0].GetAddress()) + + cancel1() + nm.checkShards() + + _, err = nm.GetShard("virtual-storage-0") + require.Error(t, err, "should return error since no nodes are healthy") +} + +func newHealthServer(t testing.TB, socketName string) (*grpc.ClientConn, *health.Server, func()) { + srv := testhelper.NewTestGrpcServer(t, nil, nil) + healthSrvr := health.NewServer() + grpc_health_v1.RegisterHealthServer(srv, healthSrvr) + healthSrvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING) + + lis, err := net.Listen("unix", socketName) + require.NoError(t, err) + + go srv.Serve(lis) + + cleanup := func() { + srv.Stop() + } + + cc, err := grpc.Dial( + "unix://"+socketName, + grpc.WithInsecure(), + ) + + require.NoError(t, err) + + return cc, healthSrvr, cleanup +} |