Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/nodes/manager.go')
-rw-r--r--internal/praefect/nodes/manager.go207
1 files changed, 53 insertions, 154 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index ad0a8309b..ea2e57feb 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -3,7 +3,6 @@ package nodes
import (
"context"
"errors"
- "sync"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -13,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -39,55 +37,43 @@ type Node interface {
GetAddress() string
GetToken() string
GetConnection() *grpc.ClientConn
-}
-
-type shard struct {
- m sync.RWMutex
- primary *nodeStatus
- secondaries []*nodeStatus
-}
-
-// GetPrimary gets the primary of a shard
-func (s *shard) GetPrimary() (Node, error) {
- s.m.RLock()
- defer s.m.RUnlock()
-
- return s.primary, nil
-}
-
-// GetSecondaries gets the secondaries of a shard
-func (s *shard) GetSecondaries() ([]Node, error) {
- s.m.RLock()
- defer s.m.RUnlock()
-
- var secondaries []Node
- for _, secondary := range s.secondaries {
- secondaries = append(secondaries, secondary)
- }
-
- return secondaries, nil
+ check(context.Context) (bool, error)
}
// Mgr is a concrete type that adheres to the Manager interface
type Mgr struct {
- // shards is a map of shards keyed on virtual storage name
- shards map[string]*shard
- // staticShards never changes based on node health. It is a static set of shards that comes from the config's
- // VirtualStorages
failoverEnabled bool
log *logrus.Entry
+ // strategies is a map of strategies keyed on virtual storage name
+ strategies map[string]leaderElectionStrategy
+}
+
+// leaderElectionStrategy defines the interface by which primary and
+// secondaries are managed.
+type leaderElectionStrategy interface {
+ start(bootstrapInterval, monitorInterval time.Duration) error
+ addNode(node Node, primary bool)
+ checkNodes(context.Context)
+
+ Shard
}
// ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence
// should not be used for a new request
var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
-// NewNodeManager creates a new NodeMgr based on virtual storage configs
+// NewManager creates a new NodeMgr based on virtual storage configs
func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
- shards := make(map[string]*shard)
+ mgr := Mgr{
+ log: log,
+ failoverEnabled: c.FailoverEnabled}
+
+ mgr.strategies = make(map[string]leaderElectionStrategy)
+
for _, virtualStorage := range c.VirtualStorages {
- var secondaries []*nodeStatus
- var primary *nodeStatus
+ strategy := newLocalElector(virtualStorage.Name, c.FailoverEnabled)
+ mgr.strategies[virtualStorage.Name] = strategy
+
for _, node := range virtualStorage.Nodes {
conn, err := client.Dial(node.Address,
append(
@@ -111,60 +97,30 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics
}
ns := newConnectionStatus(*node, conn, log, latencyHistogram)
- if node.DefaultPrimary {
- primary = ns
- continue
- }
-
- secondaries = append(secondaries, ns)
- }
-
- shards[virtualStorage.Name] = &shard{
- primary: primary,
- secondaries: secondaries,
+ strategy.addNode(ns, node.DefaultPrimary)
}
}
- return &Mgr{
- shards: shards,
- log: log,
- failoverEnabled: c.FailoverEnabled,
- }, nil
-}
-
-// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
-// for deeming a node "healthy"
-const healthcheckThreshold = 3
-
-func (n *Mgr) bootstrap(d time.Duration) error {
- timer := time.NewTimer(d)
- defer timer.Stop()
-
- for i := 0; i < healthcheckThreshold; i++ {
- <-timer.C
- n.checkShards()
- timer.Reset(d)
- }
-
- return nil
-}
-
-func (n *Mgr) monitor(d time.Duration) {
- ticker := time.NewTicker(d)
- defer ticker.Stop()
-
- for {
- <-ticker.C
- n.checkShards()
- }
+ return &mgr, nil
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
if n.failoverEnabled {
- n.bootstrap(bootstrapInterval)
- go n.monitor(monitorInterval)
+ for _, strategy := range n.strategies {
+ strategy.start(bootstrapInterval, monitorInterval)
+ }
+ }
+}
+
+// checkShards performs health checks on all the available shards. The
+// election strategy is responsible for determining the criteria for
+// when to elect a new primary and when a node is down.
+func (n *Mgr) checkShards() {
+ for _, strategy := range n.strategies {
+ ctx := context.Background()
+ strategy.checkNodes(ctx)
}
}
@@ -173,13 +129,15 @@ var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist")
// GetShard retrieves a shard for a virtual storage name
func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
- shard, ok := n.shards[virtualStorageName]
+ shard, ok := n.strategies[virtualStorageName]
if !ok {
return nil, ErrVirtualStorageNotExist
}
if n.failoverEnabled {
- if !shard.primary.isHealthy() {
+ _, err := shard.GetPrimary()
+
+ if err != nil {
return nil, ErrPrimaryNotHealthy
}
}
@@ -187,54 +145,10 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
return shard, nil
}
-func checkShard(virtualStorage string, s *shard) {
- defer func() {
- metrics.PrimaryGauge.WithLabelValues(virtualStorage, s.primary.GetStorage()).Set(1)
- for _, secondary := range s.secondaries {
- metrics.PrimaryGauge.WithLabelValues(virtualStorage, secondary.GetStorage()).Set(0)
- }
- }()
-
- s.primary.check()
- for _, secondary := range s.secondaries {
- secondary.check()
- }
-
- if s.primary.isHealthy() {
- return
- }
-
- newPrimaryIndex := -1
- for i, secondary := range s.secondaries {
- if secondary.isHealthy() {
- newPrimaryIndex = i
- break
- }
- }
-
- if newPrimaryIndex < 0 {
- // no healthy secondaries exist
- return
- }
- s.m.Lock()
- newPrimary := s.secondaries[newPrimaryIndex]
- s.secondaries = append(s.secondaries[:newPrimaryIndex], s.secondaries[newPrimaryIndex+1:]...)
- s.secondaries = append(s.secondaries, s.primary)
- s.primary = newPrimary
- s.m.Unlock()
-}
-
-func (n *Mgr) checkShards() {
- for virtualStorage, shard := range n.shards {
- checkShard(virtualStorage, shard)
- }
-}
-
func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
Node: node,
ClientConn: cc,
- statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0),
log: l,
latencyHist: latencyHist,
}
@@ -243,7 +157,6 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry,
type nodeStatus struct {
models.Node
*grpc.ClientConn
- statuses []healthpb.HealthCheckResponse_ServingStatus
log *logrus.Entry
latencyHist prommetrics.HistogramVec
}
@@ -268,38 +181,24 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn {
return n.ClientConn
}
-func (n *nodeStatus) isHealthy() bool {
- if len(n.statuses) < healthcheckThreshold {
- return false
- }
-
- for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
- if status != healthpb.HealthCheckResponse_SERVING {
- return false
- }
- }
-
- return true
-}
-
-func (n *nodeStatus) check() {
+func (n *nodeStatus) check(ctx context.Context) (bool, error) {
client := healthpb.NewHealthClient(n.ClientConn)
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
+ status := false
start := time.Now()
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
n.latencyHist.WithLabelValues(n.Storage).Observe(time.Since(start).Seconds())
- if err != nil {
- n.log.WithError(err).WithField("storage", n.Storage).WithField("address", n.Address).Warn("error when pinging healthcheck")
- resp = &healthpb.HealthCheckResponse{
- Status: healthpb.HealthCheckResponse_UNKNOWN,
- }
+ if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
+ status = true
+ } else {
+ n.log.WithError(err).WithFields(logrus.Fields{
+ "storage": n.Storage,
+ "address": n.Address,
+ }).Warn("error when pinging healthcheck")
}
- n.statuses = append(n.statuses, resp.Status)
- if len(n.statuses) > healthcheckThreshold {
- n.statuses = n.statuses[1:]
- }
+ return status, err
}