Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2020-03-28 06:26:46 +0300
committerStan Hu <stanhu@gmail.com>2020-03-31 20:33:47 +0300
commit8c67a12861ca7d324eb00d00bc4af8988381736d (patch)
tree24024efde20e702474a74a4c2bd0187988783635
parent6f4132638538078909c43ca7bb70405ec3267d45 (diff)
Refactor Praefect node manager
Move logic for local-memory based elections into its own strategies in preparation for supporting SQL and Consul-based strategies.
-rw-r--r--changelogs/unreleased/sh-refactor-praefect-node-manager.yml5
-rw-r--r--internal/praefect/nodes/local_elector.go201
-rw-r--r--internal/praefect/nodes/local_elector_test.go53
-rw-r--r--internal/praefect/nodes/manager.go207
-rw-r--r--internal/praefect/nodes/manager_test.go17
5 files changed, 323 insertions, 160 deletions
diff --git a/changelogs/unreleased/sh-refactor-praefect-node-manager.yml b/changelogs/unreleased/sh-refactor-praefect-node-manager.yml
new file mode 100644
index 000000000..c8d71a8b6
--- /dev/null
+++ b/changelogs/unreleased/sh-refactor-praefect-node-manager.yml
@@ -0,0 +1,5 @@
+---
+title: Refactor Praefect node manager
+merge_request: 1940
+author:
+type: other
diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go
new file mode 100644
index 000000000..12308ab26
--- /dev/null
+++ b/internal/praefect/nodes/local_elector.go
@@ -0,0 +1,201 @@
+package nodes
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+)
+
+type nodeCandidate struct {
+ node Node
+ up bool
+ primary bool
+ statuses []bool
+}
+
+// localElector relies on an in-memory datastore to track the primary
+// and secondaries. A single election strategy pertains to a single
+// shard. It does NOT support multiple Praefect nodes or have any
+// persistence. This is used mostly for testing and development.
+type localElector struct {
+ m sync.RWMutex
+ failoverEnabled bool
+ shardName string
+ nodes []*nodeCandidate
+ primaryNode *nodeCandidate
+}
+
+// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
+// for deeming a node "healthy"
+const healthcheckThreshold = 3
+
+func (n *nodeCandidate) isHealthy() bool {
+ if len(n.statuses) < healthcheckThreshold {
+ return false
+ }
+
+ for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
+ if !status {
+ return false
+ }
+ }
+
+ return true
+}
+
+func newLocalElector(name string, failoverEnabled bool) *localElector {
+ return &localElector{
+ shardName: name,
+ failoverEnabled: failoverEnabled,
+ }
+}
+
+// addNode registers a primary or secondary in the internal
+// datastore.
+func (s *localElector) addNode(node Node, primary bool) {
+ localNode := nodeCandidate{
+ node: node,
+ primary: primary,
+ statuses: make([]bool, 0),
+ up: false,
+ }
+
+ // If failover hasn't been activated, we assume all nodes are up
+ // since health checks aren't run.
+ if !s.failoverEnabled {
+ localNode.up = true
+ }
+
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if primary {
+ s.primaryNode = &localNode
+ }
+
+ s.nodes = append(s.nodes, &localNode)
+}
+
+// Start launches a Goroutine to check the state of the nodes and
+// continuously monitor their health via gRPC health checks.
+func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) error {
+ s.bootstrap(bootstrapInterval)
+ go s.monitor(monitorInterval)
+
+ return nil
+}
+
+func (s *localElector) bootstrap(d time.Duration) {
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+
+ for i := 0; i < healthcheckThreshold; i++ {
+ <-timer.C
+
+ ctx := context.TODO()
+ s.checkNodes(ctx)
+ timer.Reset(d)
+ }
+}
+
+func (s *localElector) monitor(d time.Duration) {
+ ticker := time.NewTicker(d)
+ defer ticker.Stop()
+
+ for {
+ ctx := context.Background()
+ s.checkNodes(ctx)
+ }
+}
+
+// checkNodes issues a gRPC health check for each node managed by the
+// shard.
+func (s *localElector) checkNodes(ctx context.Context) {
+ defer s.updateMetrics()
+
+ for _, n := range s.nodes {
+ status, _ := n.node.check(ctx)
+ n.statuses = append(n.statuses, status)
+
+ if len(n.statuses) > healthcheckThreshold {
+ n.statuses = n.statuses[1:]
+ }
+
+ up := n.isHealthy()
+ n.up = up
+ }
+
+ if s.primaryNode != nil && s.primaryNode.isHealthy() {
+ return
+ }
+
+ var newPrimary *nodeCandidate
+
+ for _, node := range s.nodes {
+ if !node.primary && node.isHealthy() {
+ newPrimary = node
+ break
+ }
+ }
+
+ if newPrimary == nil {
+ return
+ }
+
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ s.primaryNode.primary = false
+ s.primaryNode = newPrimary
+ newPrimary.primary = true
+}
+
+// GetPrimary gets the primary of a shard. If no primary exists, it will
+// be nil. If a primary has been elected but is down, err will be
+// ErrPrimaryNotHealthy.
+func (s *localElector) GetPrimary() (Node, error) {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ if s.primaryNode == nil {
+ return nil, ErrPrimaryNotHealthy
+ }
+
+ if !s.primaryNode.up {
+ return s.primaryNode.node, ErrPrimaryNotHealthy
+ }
+
+ return s.primaryNode.node, nil
+}
+
+// GetSecondaries gets the secondaries of a shard
+func (s *localElector) GetSecondaries() ([]Node, error) {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ var secondaries []Node
+ for _, n := range s.nodes {
+ if !n.primary {
+ secondaries = append(secondaries, n.node)
+ }
+ }
+
+ return secondaries, nil
+}
+
+func (s *localElector) updateMetrics() {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ for _, node := range s.nodes {
+ var val float64
+
+ if node.primary {
+ val = 1
+ }
+
+ metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val)
+ }
+}
diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go
new file mode 100644
index 000000000..16f3a3875
--- /dev/null
+++ b/internal/praefect/nodes/local_elector_test.go
@@ -0,0 +1,53 @@
+package nodes
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
+ "google.golang.org/grpc"
+)
+
+func TestPrimaryAndSecondaries(t *testing.T) {
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ svr, _ := testhelper.NewServerWithHealth(t, socket)
+ defer svr.Stop()
+
+ cc, err := grpc.Dial(
+ "unix://"+socket,
+ grpc.WithInsecure(),
+ )
+
+ require.NoError(t, err)
+
+ storageName := "default"
+ mockHistogramVec := promtest.NewMockHistogramVec()
+
+ cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec)
+ strategy := newLocalElector(storageName, true)
+
+ strategy.addNode(cs, true)
+ strategy.bootstrap(time.Second)
+
+ primary, err := strategy.GetPrimary()
+
+ require.NoError(t, err)
+ require.Equal(t, primary, cs)
+
+ secondaries, err := strategy.GetSecondaries()
+
+ require.NoError(t, err)
+ require.Equal(t, 0, len(secondaries))
+
+ secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), nil)
+ strategy.addNode(secondary, false)
+
+ secondaries, err = strategy.GetSecondaries()
+
+ require.NoError(t, err)
+ require.Equal(t, 1, len(secondaries))
+ require.Equal(t, secondary, secondaries[0])
+}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index ad0a8309b..ea2e57feb 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -3,7 +3,6 @@ package nodes
import (
"context"
"errors"
- "sync"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -13,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
@@ -39,55 +37,43 @@ type Node interface {
GetAddress() string
GetToken() string
GetConnection() *grpc.ClientConn
-}
-
-type shard struct {
- m sync.RWMutex
- primary *nodeStatus
- secondaries []*nodeStatus
-}
-
-// GetPrimary gets the primary of a shard
-func (s *shard) GetPrimary() (Node, error) {
- s.m.RLock()
- defer s.m.RUnlock()
-
- return s.primary, nil
-}
-
-// GetSecondaries gets the secondaries of a shard
-func (s *shard) GetSecondaries() ([]Node, error) {
- s.m.RLock()
- defer s.m.RUnlock()
-
- var secondaries []Node
- for _, secondary := range s.secondaries {
- secondaries = append(secondaries, secondary)
- }
-
- return secondaries, nil
+ check(context.Context) (bool, error)
}
// Mgr is a concrete type that adheres to the Manager interface
type Mgr struct {
- // shards is a map of shards keyed on virtual storage name
- shards map[string]*shard
- // staticShards never changes based on node health. It is a static set of shards that comes from the config's
- // VirtualStorages
failoverEnabled bool
log *logrus.Entry
+ // strategies is a map of strategies keyed on virtual storage name
+ strategies map[string]leaderElectionStrategy
+}
+
+// leaderElectionStrategy defines the interface by which primary and
+// secondaries are managed.
+type leaderElectionStrategy interface {
+ start(bootstrapInterval, monitorInterval time.Duration) error
+ addNode(node Node, primary bool)
+ checkNodes(context.Context)
+
+ Shard
}
// ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence
// should not be used for a new request
var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
-// NewNodeManager creates a new NodeMgr based on virtual storage configs
+// NewManager creates a new NodeMgr based on virtual storage configs
func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
- shards := make(map[string]*shard)
+ mgr := Mgr{
+ log: log,
+ failoverEnabled: c.FailoverEnabled}
+
+ mgr.strategies = make(map[string]leaderElectionStrategy)
+
for _, virtualStorage := range c.VirtualStorages {
- var secondaries []*nodeStatus
- var primary *nodeStatus
+ strategy := newLocalElector(virtualStorage.Name, c.FailoverEnabled)
+ mgr.strategies[virtualStorage.Name] = strategy
+
for _, node := range virtualStorage.Nodes {
conn, err := client.Dial(node.Address,
append(
@@ -111,60 +97,30 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics
}
ns := newConnectionStatus(*node, conn, log, latencyHistogram)
- if node.DefaultPrimary {
- primary = ns
- continue
- }
-
- secondaries = append(secondaries, ns)
- }
-
- shards[virtualStorage.Name] = &shard{
- primary: primary,
- secondaries: secondaries,
+ strategy.addNode(ns, node.DefaultPrimary)
}
}
- return &Mgr{
- shards: shards,
- log: log,
- failoverEnabled: c.FailoverEnabled,
- }, nil
-}
-
-// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
-// for deeming a node "healthy"
-const healthcheckThreshold = 3
-
-func (n *Mgr) bootstrap(d time.Duration) error {
- timer := time.NewTimer(d)
- defer timer.Stop()
-
- for i := 0; i < healthcheckThreshold; i++ {
- <-timer.C
- n.checkShards()
- timer.Reset(d)
- }
-
- return nil
-}
-
-func (n *Mgr) monitor(d time.Duration) {
- ticker := time.NewTicker(d)
- defer ticker.Stop()
-
- for {
- <-ticker.C
- n.checkShards()
- }
+ return &mgr, nil
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
if n.failoverEnabled {
- n.bootstrap(bootstrapInterval)
- go n.monitor(monitorInterval)
+ for _, strategy := range n.strategies {
+ strategy.start(bootstrapInterval, monitorInterval)
+ }
+ }
+}
+
+// checkShards performs health checks on all the available shards. The
+// election strategy is responsible for determining the criteria for
+// when to elect a new primary and when a node is down.
+func (n *Mgr) checkShards() {
+ for _, strategy := range n.strategies {
+ ctx := context.Background()
+ strategy.checkNodes(ctx)
}
}
@@ -173,13 +129,15 @@ var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist")
// GetShard retrieves a shard for a virtual storage name
func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
- shard, ok := n.shards[virtualStorageName]
+ shard, ok := n.strategies[virtualStorageName]
if !ok {
return nil, ErrVirtualStorageNotExist
}
if n.failoverEnabled {
- if !shard.primary.isHealthy() {
+ _, err := shard.GetPrimary()
+
+ if err != nil {
return nil, ErrPrimaryNotHealthy
}
}
@@ -187,54 +145,10 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
return shard, nil
}
-func checkShard(virtualStorage string, s *shard) {
- defer func() {
- metrics.PrimaryGauge.WithLabelValues(virtualStorage, s.primary.GetStorage()).Set(1)
- for _, secondary := range s.secondaries {
- metrics.PrimaryGauge.WithLabelValues(virtualStorage, secondary.GetStorage()).Set(0)
- }
- }()
-
- s.primary.check()
- for _, secondary := range s.secondaries {
- secondary.check()
- }
-
- if s.primary.isHealthy() {
- return
- }
-
- newPrimaryIndex := -1
- for i, secondary := range s.secondaries {
- if secondary.isHealthy() {
- newPrimaryIndex = i
- break
- }
- }
-
- if newPrimaryIndex < 0 {
- // no healthy secondaries exist
- return
- }
- s.m.Lock()
- newPrimary := s.secondaries[newPrimaryIndex]
- s.secondaries = append(s.secondaries[:newPrimaryIndex], s.secondaries[newPrimaryIndex+1:]...)
- s.secondaries = append(s.secondaries, s.primary)
- s.primary = newPrimary
- s.m.Unlock()
-}
-
-func (n *Mgr) checkShards() {
- for virtualStorage, shard := range n.shards {
- checkShard(virtualStorage, shard)
- }
-}
-
func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
Node: node,
ClientConn: cc,
- statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0),
log: l,
latencyHist: latencyHist,
}
@@ -243,7 +157,6 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry,
type nodeStatus struct {
models.Node
*grpc.ClientConn
- statuses []healthpb.HealthCheckResponse_ServingStatus
log *logrus.Entry
latencyHist prommetrics.HistogramVec
}
@@ -268,38 +181,24 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn {
return n.ClientConn
}
-func (n *nodeStatus) isHealthy() bool {
- if len(n.statuses) < healthcheckThreshold {
- return false
- }
-
- for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
- if status != healthpb.HealthCheckResponse_SERVING {
- return false
- }
- }
-
- return true
-}
-
-func (n *nodeStatus) check() {
+func (n *nodeStatus) check(ctx context.Context) (bool, error) {
client := healthpb.NewHealthClient(n.ClientConn)
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
+ status := false
start := time.Now()
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
n.latencyHist.WithLabelValues(n.Storage).Observe(time.Since(start).Seconds())
- if err != nil {
- n.log.WithError(err).WithField("storage", n.Storage).WithField("address", n.Address).Warn("error when pinging healthcheck")
- resp = &healthpb.HealthCheckResponse{
- Status: healthpb.HealthCheckResponse_UNKNOWN,
- }
+ if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
+ status = true
+ } else {
+ n.log.WithError(err).WithFields(logrus.Fields{
+ "storage": n.Storage,
+ "address": n.Address,
+ }).Warn("error when pinging healthcheck")
}
- n.statuses = append(n.statuses, resp.Status)
- if len(n.statuses) > healthcheckThreshold {
- n.statuses = n.statuses[1:]
- }
+ return status, err
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 8f5e1443b..185c640d6 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -1,6 +1,7 @@
package nodes
import (
+ "context"
"testing"
"time"
@@ -30,21 +31,25 @@ func TestNodeStatus(t *testing.T) {
storageName := "default"
cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec)
- require.False(t, cs.isHealthy())
-
var expectedLabels [][]string
for i := 0; i < healthcheckThreshold; i++ {
- cs.check()
+ ctx := context.Background()
+ status, err := cs.check(ctx)
+
+ require.NoError(t, err)
+ require.True(t, status)
expectedLabels = append(expectedLabels, []string{storageName})
}
- require.True(t, cs.isHealthy())
+
require.Equal(t, expectedLabels, mockHistogramVec.LabelsCalled())
require.Len(t, mockHistogramVec.Observer().Observed(), healthcheckThreshold)
healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
- cs.check()
- require.False(t, cs.isHealthy())
+ ctx := context.Background()
+ status, err := cs.check(ctx)
+ require.NoError(t, err)
+ require.False(t, status)
}
func TestNodeManager(t *testing.T) {