diff options
author | Stan Hu <stanhu@gmail.com> | 2020-03-25 07:31:54 +0300 |
---|---|---|
committer | Stan Hu <stanhu@gmail.com> | 2020-03-25 08:27:52 +0300 |
commit | 7c133c0a87a24787347c0ce3a9c86573e9111ce3 (patch) | |
tree | 02a5e017bd669f5f5cad4fe5cd853c7251c1d02c | |
parent | 57353429b7d48b587278239f0d78fcf89edfb699 (diff) |
Add SQL-based election strategysh-sql-leader-election
-rw-r--r-- | internal/praefect/config/config.go | 3 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go | 20 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 21 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_election_strategy.go | 225 |
4 files changed, 267 insertions, 2 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 7b5296234..4c80ae4fd 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -29,7 +29,8 @@ type Config struct { Prometheus prometheus.Config `toml:"prometheus"` Auth auth.Config `toml:"auth"` DB `toml:"database"` - FailoverEnabled bool `toml:"failover_enabled"` + FailoverEnabled bool `toml:"failover_enabled"` + ElectionStrategy string `toml:"election_strategy"` } // VirtualStorage represents a set of nodes for a storage diff --git a/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go new file mode 100644 index 000000000..b7b88cc56 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go @@ -0,0 +1,20 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200324001604_add_shard_elections_table", + Up: []string{`CREATE TABLE shard_elections ( + is_primary boolean DEFAULT 'true' NOT NULL, + shard_name varchar(255) NOT NULL, + node_name varchar(255) NOT NULL, + last_seen_active timestamp NOT NULL + )`, + "CREATE UNIQUE INDEX primary_shard_idx ON shard_elections (is_primary, shard_name)", + }, + Down: []string{"DROP TABLE shard_elections"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index fd6621ae9..74b3dba76 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -71,7 +71,12 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.His mgr.strategies = make(map[string]LeaderElectionStrategy) for _, virtualStorage := range c.VirtualStorages { - strategy := newLocalMemoryElectionStrategy(virtualStorage.Name, c.FailoverEnabled) + strategy, err := mgr.createElectionStrategy(virtualStorage.Name, c) + + if err != nil { + return nil, err + } + mgr.strategies[virtualStorage.Name] = strategy for _, node := range virtualStorage.Nodes { @@ -104,6 +109,20 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.His return &mgr, nil } +func (n *Mgr) createElectionStrategy(shardName string, c config.Config) (LeaderElectionStrategy, error) { + if c.ElectionStrategy == "sql" { + strategy, err := newSqlElectionStrategy(shardName, c, n.log) + + if err != nil { + return nil, err + } + + return strategy, err + } + + return newLocalMemoryElectionStrategy(shardName, c.FailoverEnabled), nil +} + // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off // the monitoring process. Start must be called before NodeMgr can be used. func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { diff --git a/internal/praefect/nodes/sql_election_strategy.go b/internal/praefect/nodes/sql_election_strategy.go new file mode 100644 index 000000000..3d6e802ef --- /dev/null +++ b/internal/praefect/nodes/sql_election_strategy.go @@ -0,0 +1,225 @@ +package nodes + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" +) + +var failoverThresholdSeconds = 20 + +type sqlNode struct { + node Node + primary bool +} + +type SqlElectionStrategy struct { + m sync.RWMutex + shardName string + nodes []*sqlNode + primaryNode *sqlNode + db *sql.DB + log *logrus.Entry +} + +func newSqlElectionStrategy(name string, c config.Config, log *logrus.Entry) (*SqlElectionStrategy, error) { + db, err := glsql.OpenDB(c.DB) + if err != nil { + return nil, err + } + + return &SqlElectionStrategy{ + shardName: name, + db: db, + log: log, + }, nil +} + +// AddNode registers a primary or secondary in the internal +// datastore. +func (s *SqlElectionStrategy) AddNode(node Node, primary bool) { + localNode := sqlNode{ + node: node, + primary: primary, + } + + s.m.Lock() + defer s.m.Unlock() + + if primary { + s.primaryNode = &localNode + } + + s.nodes = append(s.nodes, &localNode) +} + +// Start launches a Goroutine to check the state of the nodes and +// continuously monitor their health via gRPC health checks. +func (s *SqlElectionStrategy) Start(bootstrapInterval, monitorInterval time.Duration) error { + s.bootstrap(bootstrapInterval) + go s.monitor(monitorInterval) + + return nil +} + +func (s *SqlElectionStrategy) bootstrap(d time.Duration) error { + s.CheckShard() + + return nil +} + +func (s *SqlElectionStrategy) monitor(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + <-ticker.C + s.CheckShard() + } +} + +// CheckShard issues a gRPC health check for each node managed by the +// shard. +func (s *SqlElectionStrategy) CheckShard() { + defer s.updateMetrics() + var wg sync.WaitGroup + + for _, n := range s.nodes { + s.log.Debug("checking node " + n.node.GetStorage() + ": " + n.node.GetAddress()) + + wg.Add(1) + go func(n *sqlNode) { + defer wg.Done() + + if n.node.check() { + s.updateLeader(n.node.GetStorage()) + } else { + s.log.Info("No response from " + n.node.GetStorage()) + } + }(n) + } + + wg.Wait() + candidate, err := s.lookupPrimary() + + if err == nil && candidate != s.primaryNode { + s.log.WithFields(logrus.Fields{ + "old_primary": s.primaryNode.node.GetStorage(), + "new_primary": candidate.node.GetStorage(), + "shard": s.shardName}).Info("primary node changed") + + s.m.Lock() + defer s.m.Unlock() + + if s.primaryNode != nil { + s.primaryNode.primary = false + } + + s.primaryNode = candidate + candidate.primary = true + } +} + +// GetPrimary gets the primary of a shard. If no primary exists, it will +// be nil. If a primary has been elected but is down, err will be +// ErrPrimaryNotHealthy. +func (s *SqlElectionStrategy) GetPrimary() (Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.primaryNode == nil { + return nil, ErrPrimaryNotHealthy + } + + return s.primaryNode.node, nil +} + +// GetSecondaries gets the secondaries of a shard +func (s *SqlElectionStrategy) GetSecondaries() ([]Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + var secondaries []Node + for _, n := range s.nodes { + if !n.primary { + secondaries = append(secondaries, n.node) + } + } + + return secondaries, nil +} + +func (s *SqlElectionStrategy) updateMetrics() { + s.m.RLock() + defer s.m.RUnlock() + + for _, node := range s.nodes { + val := float64(0) + + if node.primary { + val = float64(1) + } + + metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val) + } +} + +func (s *SqlElectionStrategy) updateLeader(storageName string) error { + q := `INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) + VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) + DO UPDATE SET + node_name = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + excluded.node_name + ELSE + shard_elections.node_name + END, + last_seen_active = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + now() + ELSE + shard_elections.last_seen_active + END` + + _, err := s.db.Exec(fmt.Sprintf(q, s.shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds)) + + if err != nil { + s.log.Errorf("Error updating leader: %s", err) + } + return err +} + +func (s *SqlElectionStrategy) lookupPrimary() (*sqlNode, error) { + q := fmt.Sprintf(`SELECT node_name FROM shard_elections + WHERE last_seen_active > now() - interval '%d seconds' + AND is_primary IS TRUE + AND shard_name = '%s'`, failoverThresholdSeconds, s.shardName) + + rows, err := s.db.Query(q) + if err != nil { + s.log.Errorf("Error looking up primary: %s", err) + return nil, err + } + defer rows.Close() + + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, err + } + + for _, n := range s.nodes { + if n.node.GetStorage() == name { + return n, nil + } + } + } + + return nil, err +} |