Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2020-03-25 07:31:54 +0300
committerStan Hu <stanhu@gmail.com>2020-03-25 08:27:52 +0300
commit7c133c0a87a24787347c0ce3a9c86573e9111ce3 (patch)
tree02a5e017bd669f5f5cad4fe5cd853c7251c1d02c
parent57353429b7d48b587278239f0d78fcf89edfb699 (diff)
Add SQL-based election strategysh-sql-leader-election
-rw-r--r--internal/praefect/config/config.go3
-rw-r--r--internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go20
-rw-r--r--internal/praefect/nodes/manager.go21
-rw-r--r--internal/praefect/nodes/sql_election_strategy.go225
4 files changed, 267 insertions, 2 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 7b5296234..4c80ae4fd 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -29,7 +29,8 @@ type Config struct {
Prometheus prometheus.Config `toml:"prometheus"`
Auth auth.Config `toml:"auth"`
DB `toml:"database"`
- FailoverEnabled bool `toml:"failover_enabled"`
+ FailoverEnabled bool `toml:"failover_enabled"`
+ ElectionStrategy string `toml:"election_strategy"`
}
// VirtualStorage represents a set of nodes for a storage
diff --git a/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go
new file mode 100644
index 000000000..b7b88cc56
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go
@@ -0,0 +1,20 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200324001604_add_shard_elections_table",
+ Up: []string{`CREATE TABLE shard_elections (
+ is_primary boolean DEFAULT 'true' NOT NULL,
+ shard_name varchar(255) NOT NULL,
+ node_name varchar(255) NOT NULL,
+ last_seen_active timestamp NOT NULL
+ )`,
+ "CREATE UNIQUE INDEX primary_shard_idx ON shard_elections (is_primary, shard_name)",
+ },
+ Down: []string{"DROP TABLE shard_elections"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index fd6621ae9..74b3dba76 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -71,7 +71,12 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.His
mgr.strategies = make(map[string]LeaderElectionStrategy)
for _, virtualStorage := range c.VirtualStorages {
- strategy := newLocalMemoryElectionStrategy(virtualStorage.Name, c.FailoverEnabled)
+ strategy, err := mgr.createElectionStrategy(virtualStorage.Name, c)
+
+ if err != nil {
+ return nil, err
+ }
+
mgr.strategies[virtualStorage.Name] = strategy
for _, node := range virtualStorage.Nodes {
@@ -104,6 +109,20 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.His
return &mgr, nil
}
+func (n *Mgr) createElectionStrategy(shardName string, c config.Config) (LeaderElectionStrategy, error) {
+ if c.ElectionStrategy == "sql" {
+ strategy, err := newSqlElectionStrategy(shardName, c, n.log)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return strategy, err
+ }
+
+ return newLocalMemoryElectionStrategy(shardName, c.FailoverEnabled), nil
+}
+
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
diff --git a/internal/praefect/nodes/sql_election_strategy.go b/internal/praefect/nodes/sql_election_strategy.go
new file mode 100644
index 000000000..3d6e802ef
--- /dev/null
+++ b/internal/praefect/nodes/sql_election_strategy.go
@@ -0,0 +1,225 @@
+package nodes
+
+import (
+ "database/sql"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+)
+
+var failoverThresholdSeconds = 20
+
+type sqlNode struct {
+ node Node
+ primary bool
+}
+
+type SqlElectionStrategy struct {
+ m sync.RWMutex
+ shardName string
+ nodes []*sqlNode
+ primaryNode *sqlNode
+ db *sql.DB
+ log *logrus.Entry
+}
+
+func newSqlElectionStrategy(name string, c config.Config, log *logrus.Entry) (*SqlElectionStrategy, error) {
+ db, err := glsql.OpenDB(c.DB)
+ if err != nil {
+ return nil, err
+ }
+
+ return &SqlElectionStrategy{
+ shardName: name,
+ db: db,
+ log: log,
+ }, nil
+}
+
+// AddNode registers a primary or secondary in the internal
+// datastore.
+func (s *SqlElectionStrategy) AddNode(node Node, primary bool) {
+ localNode := sqlNode{
+ node: node,
+ primary: primary,
+ }
+
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if primary {
+ s.primaryNode = &localNode
+ }
+
+ s.nodes = append(s.nodes, &localNode)
+}
+
+// Start launches a Goroutine to check the state of the nodes and
+// continuously monitor their health via gRPC health checks.
+func (s *SqlElectionStrategy) Start(bootstrapInterval, monitorInterval time.Duration) error {
+ s.bootstrap(bootstrapInterval)
+ go s.monitor(monitorInterval)
+
+ return nil
+}
+
+func (s *SqlElectionStrategy) bootstrap(d time.Duration) error {
+ s.CheckShard()
+
+ return nil
+}
+
+func (s *SqlElectionStrategy) monitor(d time.Duration) {
+ ticker := time.NewTicker(d)
+ defer ticker.Stop()
+
+ for {
+ <-ticker.C
+ s.CheckShard()
+ }
+}
+
+// CheckShard issues a gRPC health check for each node managed by the
+// shard.
+func (s *SqlElectionStrategy) CheckShard() {
+ defer s.updateMetrics()
+ var wg sync.WaitGroup
+
+ for _, n := range s.nodes {
+ s.log.Debug("checking node " + n.node.GetStorage() + ": " + n.node.GetAddress())
+
+ wg.Add(1)
+ go func(n *sqlNode) {
+ defer wg.Done()
+
+ if n.node.check() {
+ s.updateLeader(n.node.GetStorage())
+ } else {
+ s.log.Info("No response from " + n.node.GetStorage())
+ }
+ }(n)
+ }
+
+ wg.Wait()
+ candidate, err := s.lookupPrimary()
+
+ if err == nil && candidate != s.primaryNode {
+ s.log.WithFields(logrus.Fields{
+ "old_primary": s.primaryNode.node.GetStorage(),
+ "new_primary": candidate.node.GetStorage(),
+ "shard": s.shardName}).Info("primary node changed")
+
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if s.primaryNode != nil {
+ s.primaryNode.primary = false
+ }
+
+ s.primaryNode = candidate
+ candidate.primary = true
+ }
+}
+
+// GetPrimary gets the primary of a shard. If no primary exists, it will
+// be nil. If a primary has been elected but is down, err will be
+// ErrPrimaryNotHealthy.
+func (s *SqlElectionStrategy) GetPrimary() (Node, error) {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ if s.primaryNode == nil {
+ return nil, ErrPrimaryNotHealthy
+ }
+
+ return s.primaryNode.node, nil
+}
+
+// GetSecondaries gets the secondaries of a shard
+func (s *SqlElectionStrategy) GetSecondaries() ([]Node, error) {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ var secondaries []Node
+ for _, n := range s.nodes {
+ if !n.primary {
+ secondaries = append(secondaries, n.node)
+ }
+ }
+
+ return secondaries, nil
+}
+
+func (s *SqlElectionStrategy) updateMetrics() {
+ s.m.RLock()
+ defer s.m.RUnlock()
+
+ for _, node := range s.nodes {
+ val := float64(0)
+
+ if node.primary {
+ val = float64(1)
+ }
+
+ metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val)
+ }
+}
+
+func (s *SqlElectionStrategy) updateLeader(storageName string) error {
+ q := `INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active)
+ VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name)
+ DO UPDATE SET
+ node_name =
+ CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN
+ excluded.node_name
+ ELSE
+ shard_elections.node_name
+ END,
+ last_seen_active =
+ CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN
+ now()
+ ELSE
+ shard_elections.last_seen_active
+ END`
+
+ _, err := s.db.Exec(fmt.Sprintf(q, s.shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds))
+
+ if err != nil {
+ s.log.Errorf("Error updating leader: %s", err)
+ }
+ return err
+}
+
+func (s *SqlElectionStrategy) lookupPrimary() (*sqlNode, error) {
+ q := fmt.Sprintf(`SELECT node_name FROM shard_elections
+ WHERE last_seen_active > now() - interval '%d seconds'
+ AND is_primary IS TRUE
+ AND shard_name = '%s'`, failoverThresholdSeconds, s.shardName)
+
+ rows, err := s.db.Query(q)
+ if err != nil {
+ s.log.Errorf("Error looking up primary: %s", err)
+ return nil, err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var name string
+ if err := rows.Scan(&name); err != nil {
+ return nil, err
+ }
+
+ for _, n := range s.nodes {
+ if n.node.GetStorage() == name {
+ return n, nil
+ }
+ }
+ }
+
+ return nil, err
+}