diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-03-31 21:39:42 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-03-31 21:39:42 +0300 |
commit | 56d460969054483b3e5725d393619569f4299f32 (patch) | |
tree | 24024efde20e702474a74a4c2bd0187988783635 | |
parent | 6f4132638538078909c43ca7bb70405ec3267d45 (diff) | |
parent | 8c67a12861ca7d324eb00d00bc4af8988381736d (diff) |
Merge branch 'sh-refactor-praefect-node-manager' into 'master'
Refactor Praefect node manager
See merge request gitlab-org/gitaly!1940
-rw-r--r-- | changelogs/unreleased/sh-refactor-praefect-node-manager.yml | 5 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector.go | 201 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector_test.go | 53 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 207 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 17 |
5 files changed, 323 insertions, 160 deletions
diff --git a/changelogs/unreleased/sh-refactor-praefect-node-manager.yml b/changelogs/unreleased/sh-refactor-praefect-node-manager.yml new file mode 100644 index 000000000..c8d71a8b6 --- /dev/null +++ b/changelogs/unreleased/sh-refactor-praefect-node-manager.yml @@ -0,0 +1,5 @@ +--- +title: Refactor Praefect node manager +merge_request: 1940 +author: +type: other diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go new file mode 100644 index 000000000..12308ab26 --- /dev/null +++ b/internal/praefect/nodes/local_elector.go @@ -0,0 +1,201 @@ +package nodes + +import ( + "context" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" +) + +type nodeCandidate struct { + node Node + up bool + primary bool + statuses []bool +} + +// localElector relies on an in-memory datastore to track the primary +// and secondaries. A single election strategy pertains to a single +// shard. It does NOT support multiple Praefect nodes or have any +// persistence. This is used mostly for testing and development. +type localElector struct { + m sync.RWMutex + failoverEnabled bool + shardName string + nodes []*nodeCandidate + primaryNode *nodeCandidate +} + +// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary +// for deeming a node "healthy" +const healthcheckThreshold = 3 + +func (n *nodeCandidate) isHealthy() bool { + if len(n.statuses) < healthcheckThreshold { + return false + } + + for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] { + if !status { + return false + } + } + + return true +} + +func newLocalElector(name string, failoverEnabled bool) *localElector { + return &localElector{ + shardName: name, + failoverEnabled: failoverEnabled, + } +} + +// addNode registers a primary or secondary in the internal +// datastore. +func (s *localElector) addNode(node Node, primary bool) { + localNode := nodeCandidate{ + node: node, + primary: primary, + statuses: make([]bool, 0), + up: false, + } + + // If failover hasn't been activated, we assume all nodes are up + // since health checks aren't run. + if !s.failoverEnabled { + localNode.up = true + } + + s.m.Lock() + defer s.m.Unlock() + + if primary { + s.primaryNode = &localNode + } + + s.nodes = append(s.nodes, &localNode) +} + +// Start launches a Goroutine to check the state of the nodes and +// continuously monitor their health via gRPC health checks. +func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) error { + s.bootstrap(bootstrapInterval) + go s.monitor(monitorInterval) + + return nil +} + +func (s *localElector) bootstrap(d time.Duration) { + timer := time.NewTimer(d) + defer timer.Stop() + + for i := 0; i < healthcheckThreshold; i++ { + <-timer.C + + ctx := context.TODO() + s.checkNodes(ctx) + timer.Reset(d) + } +} + +func (s *localElector) monitor(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + ctx := context.Background() + s.checkNodes(ctx) + } +} + +// checkNodes issues a gRPC health check for each node managed by the +// shard. +func (s *localElector) checkNodes(ctx context.Context) { + defer s.updateMetrics() + + for _, n := range s.nodes { + status, _ := n.node.check(ctx) + n.statuses = append(n.statuses, status) + + if len(n.statuses) > healthcheckThreshold { + n.statuses = n.statuses[1:] + } + + up := n.isHealthy() + n.up = up + } + + if s.primaryNode != nil && s.primaryNode.isHealthy() { + return + } + + var newPrimary *nodeCandidate + + for _, node := range s.nodes { + if !node.primary && node.isHealthy() { + newPrimary = node + break + } + } + + if newPrimary == nil { + return + } + + s.m.Lock() + defer s.m.Unlock() + + s.primaryNode.primary = false + s.primaryNode = newPrimary + newPrimary.primary = true +} + +// GetPrimary gets the primary of a shard. If no primary exists, it will +// be nil. If a primary has been elected but is down, err will be +// ErrPrimaryNotHealthy. +func (s *localElector) GetPrimary() (Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.primaryNode == nil { + return nil, ErrPrimaryNotHealthy + } + + if !s.primaryNode.up { + return s.primaryNode.node, ErrPrimaryNotHealthy + } + + return s.primaryNode.node, nil +} + +// GetSecondaries gets the secondaries of a shard +func (s *localElector) GetSecondaries() ([]Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + var secondaries []Node + for _, n := range s.nodes { + if !n.primary { + secondaries = append(secondaries, n.node) + } + } + + return secondaries, nil +} + +func (s *localElector) updateMetrics() { + s.m.RLock() + defer s.m.RUnlock() + + for _, node := range s.nodes { + var val float64 + + if node.primary { + val = 1 + } + + metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val) + } +} diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go new file mode 100644 index 000000000..16f3a3875 --- /dev/null +++ b/internal/praefect/nodes/local_elector_test.go @@ -0,0 +1,53 @@ +package nodes + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" + "google.golang.org/grpc" +) + +func TestPrimaryAndSecondaries(t *testing.T) { + socket := testhelper.GetTemporaryGitalySocketFileName() + svr, _ := testhelper.NewServerWithHealth(t, socket) + defer svr.Stop() + + cc, err := grpc.Dial( + "unix://"+socket, + grpc.WithInsecure(), + ) + + require.NoError(t, err) + + storageName := "default" + mockHistogramVec := promtest.NewMockHistogramVec() + + cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec) + strategy := newLocalElector(storageName, true) + + strategy.addNode(cs, true) + strategy.bootstrap(time.Second) + + primary, err := strategy.GetPrimary() + + require.NoError(t, err) + require.Equal(t, primary, cs) + + secondaries, err := strategy.GetSecondaries() + + require.NoError(t, err) + require.Equal(t, 0, len(secondaries)) + + secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), nil) + strategy.addNode(secondary, false) + + secondaries, err = strategy.GetSecondaries() + + require.NoError(t, err) + require.Equal(t, 1, len(secondaries)) + require.Equal(t, secondary, secondaries[0]) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index ad0a8309b..ea2e57feb 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -3,7 +3,6 @@ package nodes import ( "context" "errors" - "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -13,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" @@ -39,55 +37,43 @@ type Node interface { GetAddress() string GetToken() string GetConnection() *grpc.ClientConn -} - -type shard struct { - m sync.RWMutex - primary *nodeStatus - secondaries []*nodeStatus -} - -// GetPrimary gets the primary of a shard -func (s *shard) GetPrimary() (Node, error) { - s.m.RLock() - defer s.m.RUnlock() - - return s.primary, nil -} - -// GetSecondaries gets the secondaries of a shard -func (s *shard) GetSecondaries() ([]Node, error) { - s.m.RLock() - defer s.m.RUnlock() - - var secondaries []Node - for _, secondary := range s.secondaries { - secondaries = append(secondaries, secondary) - } - - return secondaries, nil + check(context.Context) (bool, error) } // Mgr is a concrete type that adheres to the Manager interface type Mgr struct { - // shards is a map of shards keyed on virtual storage name - shards map[string]*shard - // staticShards never changes based on node health. It is a static set of shards that comes from the config's - // VirtualStorages failoverEnabled bool log *logrus.Entry + // strategies is a map of strategies keyed on virtual storage name + strategies map[string]leaderElectionStrategy +} + +// leaderElectionStrategy defines the interface by which primary and +// secondaries are managed. +type leaderElectionStrategy interface { + start(bootstrapInterval, monitorInterval time.Duration) error + addNode(node Node, primary bool) + checkNodes(context.Context) + + Shard } // ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence // should not be used for a new request var ErrPrimaryNotHealthy = errors.New("primary is not healthy") -// NewNodeManager creates a new NodeMgr based on virtual storage configs +// NewManager creates a new NodeMgr based on virtual storage configs func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { - shards := make(map[string]*shard) + mgr := Mgr{ + log: log, + failoverEnabled: c.FailoverEnabled} + + mgr.strategies = make(map[string]leaderElectionStrategy) + for _, virtualStorage := range c.VirtualStorages { - var secondaries []*nodeStatus - var primary *nodeStatus + strategy := newLocalElector(virtualStorage.Name, c.FailoverEnabled) + mgr.strategies[virtualStorage.Name] = strategy + for _, node := range virtualStorage.Nodes { conn, err := client.Dial(node.Address, append( @@ -111,60 +97,30 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics } ns := newConnectionStatus(*node, conn, log, latencyHistogram) - if node.DefaultPrimary { - primary = ns - continue - } - - secondaries = append(secondaries, ns) - } - - shards[virtualStorage.Name] = &shard{ - primary: primary, - secondaries: secondaries, + strategy.addNode(ns, node.DefaultPrimary) } } - return &Mgr{ - shards: shards, - log: log, - failoverEnabled: c.FailoverEnabled, - }, nil -} - -// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary -// for deeming a node "healthy" -const healthcheckThreshold = 3 - -func (n *Mgr) bootstrap(d time.Duration) error { - timer := time.NewTimer(d) - defer timer.Stop() - - for i := 0; i < healthcheckThreshold; i++ { - <-timer.C - n.checkShards() - timer.Reset(d) - } - - return nil -} - -func (n *Mgr) monitor(d time.Duration) { - ticker := time.NewTicker(d) - defer ticker.Stop() - - for { - <-ticker.C - n.checkShards() - } + return &mgr, nil } // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off // the monitoring process. Start must be called before NodeMgr can be used. func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { if n.failoverEnabled { - n.bootstrap(bootstrapInterval) - go n.monitor(monitorInterval) + for _, strategy := range n.strategies { + strategy.start(bootstrapInterval, monitorInterval) + } + } +} + +// checkShards performs health checks on all the available shards. The +// election strategy is responsible for determining the criteria for +// when to elect a new primary and when a node is down. +func (n *Mgr) checkShards() { + for _, strategy := range n.strategies { + ctx := context.Background() + strategy.checkNodes(ctx) } } @@ -173,13 +129,15 @@ var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist") // GetShard retrieves a shard for a virtual storage name func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { - shard, ok := n.shards[virtualStorageName] + shard, ok := n.strategies[virtualStorageName] if !ok { return nil, ErrVirtualStorageNotExist } if n.failoverEnabled { - if !shard.primary.isHealthy() { + _, err := shard.GetPrimary() + + if err != nil { return nil, ErrPrimaryNotHealthy } } @@ -187,54 +145,10 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { return shard, nil } -func checkShard(virtualStorage string, s *shard) { - defer func() { - metrics.PrimaryGauge.WithLabelValues(virtualStorage, s.primary.GetStorage()).Set(1) - for _, secondary := range s.secondaries { - metrics.PrimaryGauge.WithLabelValues(virtualStorage, secondary.GetStorage()).Set(0) - } - }() - - s.primary.check() - for _, secondary := range s.secondaries { - secondary.check() - } - - if s.primary.isHealthy() { - return - } - - newPrimaryIndex := -1 - for i, secondary := range s.secondaries { - if secondary.isHealthy() { - newPrimaryIndex = i - break - } - } - - if newPrimaryIndex < 0 { - // no healthy secondaries exist - return - } - s.m.Lock() - newPrimary := s.secondaries[newPrimaryIndex] - s.secondaries = append(s.secondaries[:newPrimaryIndex], s.secondaries[newPrimaryIndex+1:]...) - s.secondaries = append(s.secondaries, s.primary) - s.primary = newPrimary - s.m.Unlock() -} - -func (n *Mgr) checkShards() { - for virtualStorage, shard := range n.shards { - checkShard(virtualStorage, shard) - } -} - func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus { return &nodeStatus{ Node: node, ClientConn: cc, - statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0), log: l, latencyHist: latencyHist, } @@ -243,7 +157,6 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, type nodeStatus struct { models.Node *grpc.ClientConn - statuses []healthpb.HealthCheckResponse_ServingStatus log *logrus.Entry latencyHist prommetrics.HistogramVec } @@ -268,38 +181,24 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn { return n.ClientConn } -func (n *nodeStatus) isHealthy() bool { - if len(n.statuses) < healthcheckThreshold { - return false - } - - for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] { - if status != healthpb.HealthCheckResponse_SERVING { - return false - } - } - - return true -} - -func (n *nodeStatus) check() { +func (n *nodeStatus) check(ctx context.Context) (bool, error) { client := healthpb.NewHealthClient(n.ClientConn) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() + status := false start := time.Now() resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) n.latencyHist.WithLabelValues(n.Storage).Observe(time.Since(start).Seconds()) - if err != nil { - n.log.WithError(err).WithField("storage", n.Storage).WithField("address", n.Address).Warn("error when pinging healthcheck") - resp = &healthpb.HealthCheckResponse{ - Status: healthpb.HealthCheckResponse_UNKNOWN, - } + if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { + status = true + } else { + n.log.WithError(err).WithFields(logrus.Fields{ + "storage": n.Storage, + "address": n.Address, + }).Warn("error when pinging healthcheck") } - n.statuses = append(n.statuses, resp.Status) - if len(n.statuses) > healthcheckThreshold { - n.statuses = n.statuses[1:] - } + return status, err } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 8f5e1443b..185c640d6 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -1,6 +1,7 @@ package nodes import ( + "context" "testing" "time" @@ -30,21 +31,25 @@ func TestNodeStatus(t *testing.T) { storageName := "default" cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec) - require.False(t, cs.isHealthy()) - var expectedLabels [][]string for i := 0; i < healthcheckThreshold; i++ { - cs.check() + ctx := context.Background() + status, err := cs.check(ctx) + + require.NoError(t, err) + require.True(t, status) expectedLabels = append(expectedLabels, []string{storageName}) } - require.True(t, cs.isHealthy()) + require.Equal(t, expectedLabels, mockHistogramVec.LabelsCalled()) require.Len(t, mockHistogramVec.Observer().Observed(), healthcheckThreshold) healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - cs.check() - require.False(t, cs.isHealthy()) + ctx := context.Background() + status, err := cs.check(ctx) + require.NoError(t, err) + require.False(t, status) } func TestNodeManager(t *testing.T) { |