Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2020-04-04 00:12:10 +0300
committerJohn Cai <jcai@gitlab.com>2020-04-10 02:27:26 +0300
commit38528d8b99d02988d317c71f2116992d997cbc31 (patch)
treec9191a4390164b60e505bac0a283508cfb86c025
parent854571914ce1f1e43368ccb096bacadb1e128543 (diff)
Add SQL-based election strategy
This commit adds the following strategy to enable redundant Praefect nodes to run simultaenously: 1. Every Praefect node periodically (every second) performs a health check RPC with a Gitaly node. 2. For each node, Praefect updates a row in a new table (`node_status`) with the following information: a. The name of the Praefect instance (`praefect_name`) b. The name of the virtual storage name (`shard_name`) c. The name of the Gitaly storage name (`storage_name`) d. The timestamp of the last time Praefect tried to reach that node (`last_contact_attempt_at`) e. The timestamp of the last successful health check (`last_seen_active_at`) 3. Periodically every Praefect node does a `SELECT` from `node_status` to determine **healthy nodes**. A healthy node is defined by: a. A node that has a recent successful error check (e.g. one in the last 10 s). b. A majority of the available Praefect nodes have entries that match the two above. 4. To determine the majority, we use a lightweight service discovery protocol: a Praefect node is deemed a voting member if the `praefect_name` has a recent `last_contact_attempt_at` in the `node_status` table. The name is derived from a combination of the hostname and listening port/socket. 5. The primary of each shard is listed in the `shard_primaries`. If the current primary is in the healthy node list, then no election needs to be done. 6. Otherwise, if there is no primary or it is unhealthy, any Praefect node can elect a new primary by choosing candidate from the healthy node list and inserting a row into the table. Closes https://gitlab.com/gitlab-org/gitaly/-/issues/2547
-rw-r--r--_support/Makefile.template2
-rw-r--r--changelogs/unreleased/sh-improve-sql-leader-election.yml5
-rw-r--r--cmd/praefect/main.go18
-rw-r--r--config.praefect.toml.example4
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/config/config.go18
-rw-r--r--internal/praefect/config/config_test.go45
-rw-r--r--internal/praefect/coordinator_test.go4
-rw-r--r--internal/praefect/datastore/glsql/testing.go2
-rw-r--r--internal/praefect/datastore/migrations/20200324001604_add_sql_election_tables.go34
-rw-r--r--internal/praefect/helper_test.go4
-rw-r--r--internal/praefect/nodes/init_test.go22
-rw-r--r--internal/praefect/nodes/local_elector.go55
-rw-r--r--internal/praefect/nodes/local_elector_test.go18
-rw-r--r--internal/praefect/nodes/manager.go43
-rw-r--r--internal/praefect/nodes/manager_test.go54
-rw-r--r--internal/praefect/nodes/sql_elector.go456
-rw-r--r--internal/praefect/nodes/sql_elector_test.go163
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_test.go2
20 files changed, 883 insertions, 76 deletions
diff --git a/_support/Makefile.template b/_support/Makefile.template
index a56aca3e5..feae6dcce 100644
--- a/_support/Makefile.template
+++ b/_support/Makefile.template
@@ -128,7 +128,7 @@ rspec-gitlab-shell: {{ .GitlabShellDir }}/config.yml assemble-go prepare-tests
.PHONY: test-postgres
test-postgres: prepare-tests
- @cd {{ .SourceDir }} && go test -tags postgres -count=1 gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/...
+ @cd {{ .SourceDir }} && go test -tags postgres -count=1 gitlab.com/gitlab-org/gitaly/internal/praefect/...
.PHONY: verify
verify: check-mod-tidy check-formatting notice-up-to-date check-proto rubocop
diff --git a/changelogs/unreleased/sh-improve-sql-leader-election.yml b/changelogs/unreleased/sh-improve-sql-leader-election.yml
new file mode 100644
index 000000000..0106c3ade
--- /dev/null
+++ b/changelogs/unreleased/sh-improve-sql-leader-election.yml
@@ -0,0 +1,5 @@
+---
+title: Add SQL-based election for shard primaries
+merge_request: 1979
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 877de738b..24be3ce48 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -187,7 +187,18 @@ func run(cfgs []starter.Config, conf config.Config) error {
return err
}
- nodeManager, err := nodes.NewManager(logger, conf, nodeLatencyHistogram)
+ var db *sql.DB
+
+ if conf.NeedsSQL() {
+ dbConn, closedb, err := initDatabase(logger, conf)
+ if err != nil {
+ return err
+ }
+ defer closedb()
+ db = dbConn
+ }
+
+ nodeManager, err := nodes.NewManager(logger, conf, db, nodeLatencyHistogram)
if err != nil {
return err
}
@@ -203,11 +214,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
if conf.PostgresQueueEnabled {
- db, closedb, err := initDatabase(logger, conf)
- if err != nil {
- return err
- }
- defer closedb()
ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
} else {
ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue()
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index d45978796..534f9b9a2 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -26,6 +26,10 @@ listen_addr = "127.0.0.1:2305"
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
+[failover]
+enabled = true
+election_strategy = "local" # Options: local, sql
+
[[virtual_storage]]
name = 'praefect'
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 7fce795b2..821acd633 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -194,7 +194,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
- nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
registry := protoregistry.New()
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index da7f76015..1b01b33b2 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -14,6 +14,11 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
+type Failover struct {
+ Enabled bool `toml:"enabled"`
+ ElectionStrategy string `toml:"election_strategy"`
+}
+
// Config is a container for everything found in the TOML config file
type Config struct {
ListenAddr string `toml:"listen_addr"`
@@ -29,6 +34,8 @@ type Config struct {
Prometheus prometheus.Config `toml:"prometheus"`
Auth auth.Config `toml:"auth"`
DB `toml:"database"`
+ Failover Failover `toml:"failover"`
+ // Keep for legacy reasons: remove after Omnibus has switched
FailoverEnabled bool `toml:"failover_enabled"`
PostgresQueueEnabled bool `toml:"postgres_queue_enabled"`
}
@@ -63,6 +70,12 @@ func FromFile(filePath string) (Config, error) {
config.Nodes = nil
}
+ // TODO: Remove this after failover_enabled has moved under a separate failover section. This is for
+ // backwards compatibility only
+ if config.FailoverEnabled {
+ config.Failover.Enabled = true
+ }
+
return *config, err
}
@@ -144,6 +157,11 @@ func (c Config) Validate() error {
return nil
}
+// NeedsSQL returns true if the driver for SQL needs to be initialized
+func (c Config) NeedsSQL() bool {
+ return c.PostgresQueueEnabled || (c.Failover.Enabled && c.Failover.ElectionStrategy == "sql")
+}
+
// DB holds Postgres client configuration data.
type DB struct {
Host string `toml:"host"`
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 07e4fb449..4a01db1a4 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -265,3 +265,48 @@ func TestToPQString(t *testing.T) {
})
}
}
+
+func TestNeedsSQL(t *testing.T) {
+ testCases := []struct {
+ desc string
+ config Config
+ expected bool
+ }{
+ {
+ desc: "default",
+ config: Config{},
+ expected: false,
+ },
+ {
+ desc: "PostgreSQL queue enabled",
+ config: Config{PostgresQueueEnabled: true},
+ expected: true,
+ },
+ {
+ desc: "Failover enabled with default election strategy",
+ config: Config{Failover: Failover{Enabled: true}},
+ expected: false,
+ },
+ {
+ desc: "Failover enabled with SQL election strategy",
+ config: Config{Failover: Failover{Enabled: true, ElectionStrategy: "sql"}},
+ expected: true,
+ },
+ {
+ desc: "Both PostgresQL and SQL election strategy enabled",
+ config: Config{PostgresQueueEnabled: true, Failover: Failover{Enabled: true, ElectionStrategy: "sql"}},
+ expected: true,
+ },
+ {
+ desc: "Both PostgresQL and SQL election strategy enabled but failover disabled",
+ config: Config{PostgresQueueEnabled: true, Failover: Failover{Enabled: false, ElectionStrategy: "sql"}},
+ expected: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ require.Equal(t, tc.expected, tc.config.NeedsSQL())
+ })
+ }
+}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 6abfa023f..d7f0fd15b 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -75,7 +75,7 @@ func TestStreamDirector(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
r := protoregistry.New()
require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
@@ -200,7 +200,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
coordinator := NewCoordinator(entry, ds, nodeMgr, conf, protoregistry.GitalyProtoPreregistered)
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 8bff18850..1bda1a54c 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -54,6 +54,8 @@ func (db DB) TruncateAll(t testing.TB) {
"replication_queue_job_lock",
"replication_queue",
"replication_queue_lock",
+ "node_status",
+ "shard_primaries",
)
}
diff --git a/internal/praefect/datastore/migrations/20200324001604_add_sql_election_tables.go b/internal/praefect/datastore/migrations/20200324001604_add_sql_election_tables.go
new file mode 100644
index 000000000..7a96769b2
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200324001604_add_sql_election_tables.go
@@ -0,0 +1,34 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200324001604_add_sql_election_tables",
+ Up: []string{
+ `CREATE TABLE node_status (
+ id BIGSERIAL PRIMARY KEY,
+ praefect_name VARCHAR(511) NOT NULL,
+ shard_name VARCHAR(255) NOT NULL,
+ node_name VARCHAR(255) NOT NULL,
+ last_contact_attempt_at TIMESTAMP WITH TIME ZONE,
+ last_seen_active_at TIMESTAMP WITH TIME ZONE)`,
+ "CREATE UNIQUE INDEX shard_node_names_on_node_status_idx ON node_status (praefect_name, shard_name, node_name)",
+ "CREATE INDEX shard_name_on_node_status_idx ON node_status (shard_name, node_name)",
+
+ `CREATE TABLE shard_primaries (
+ id BIGSERIAL PRIMARY KEY,
+ shard_name VARCHAR(255) NOT NULL,
+ node_name VARCHAR(255) NOT NULL,
+ elected_by_praefect VARCHAR(255) NOT NULL,
+ elected_at TIMESTAMP WITH TIME ZONE NOT NULL)`,
+ "CREATE UNIQUE INDEX shard_name_on_shard_primaries_idx ON shard_primaries (shard_name)",
+ },
+ Down: []string{
+ "DROP TABLE shard_primaries",
+ "DROP TABLE node_status",
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 1d1353cde..080308fb8 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -117,7 +117,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
conf.VirtualStorages[0].Nodes[i] = node
}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -181,7 +181,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
}
logEntry := log.Default()
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
diff --git a/internal/praefect/nodes/init_test.go b/internal/praefect/nodes/init_test.go
new file mode 100644
index 000000000..70c50dd30
--- /dev/null
+++ b/internal/praefect/nodes/init_test.go
@@ -0,0 +1,22 @@
+// +build postgres
+
+package nodes
+
+import (
+ "log"
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+func TestMain(m *testing.M) {
+ code := m.Run()
+ // Clean closes connection to database once all tests are done
+ if err := glsql.Clean(); err != nil {
+ log.Fatalln(err, "database disconnection failure")
+ }
+ os.Exit(code)
+}
+
+func getDB(t testing.TB) glsql.DB { return glsql.GetDB(t, "nodes") }
diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go
index aa9b332de..659a794ad 100644
--- a/internal/praefect/nodes/local_elector.go
+++ b/internal/praefect/nodes/local_elector.go
@@ -5,6 +5,7 @@ import (
"sync"
"time"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
)
@@ -24,6 +25,7 @@ type localElector struct {
shardName string
nodes []*nodeCandidate
primaryNode *nodeCandidate
+ log logrus.FieldLogger
}
// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
@@ -44,39 +46,29 @@ func (n *nodeCandidate) isHealthy() bool {
return true
}
-func newLocalElector(name string, failoverEnabled bool) *localElector {
+func newLocalElector(name string, failoverEnabled bool, log logrus.FieldLogger, ns []*nodeStatus) *localElector {
+ nodes := make([]*nodeCandidate, len(ns))
+ for i, n := range ns {
+ nodes[i] = &nodeCandidate{
+ node: n,
+ primary: i == 0,
+ }
+ }
+
return &localElector{
shardName: name,
failoverEnabled: failoverEnabled,
+ log: log,
+ nodes: nodes,
+ primaryNode: nodes[0],
}
}
-// addNode registers a primary or secondary in the internal
-// datastore.
-func (s *localElector) addNode(node Node, primary bool) {
- localNode := nodeCandidate{
- node: node,
- primary: primary,
- statuses: make([]bool, 0),
- }
-
- s.m.Lock()
- defer s.m.Unlock()
-
- if primary {
- s.primaryNode = &localNode
- }
-
- s.nodes = append(s.nodes, &localNode)
-}
-
// Start launches a Goroutine to check the state of the nodes and
// continuously monitor their health via gRPC health checks.
-func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) error {
+func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) {
s.bootstrap(bootstrapInterval)
go s.monitor(monitorInterval)
-
- return nil
}
func (s *localElector) bootstrap(d time.Duration) {
@@ -96,17 +88,22 @@ func (s *localElector) monitor(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
+ ctx := context.Background()
+
for {
<-ticker.C
- ctx := context.Background()
- s.checkNodes(ctx)
+ err := s.checkNodes(ctx)
+
+ if err != nil {
+ s.log.WithError(err).Warn("error checking nodes")
+ }
}
}
// checkNodes issues a gRPC health check for each node managed by the
// shard.
-func (s *localElector) checkNodes(ctx context.Context) {
+func (s *localElector) checkNodes(ctx context.Context) error {
s.m.Lock()
defer s.updateMetrics()
defer s.m.Unlock()
@@ -121,7 +118,7 @@ func (s *localElector) checkNodes(ctx context.Context) {
}
if s.primaryNode != nil && s.primaryNode.isHealthy() {
- return
+ return nil
}
var newPrimary *nodeCandidate
@@ -134,12 +131,14 @@ func (s *localElector) checkNodes(ctx context.Context) {
}
if newPrimary == nil {
- return
+ return ErrPrimaryNotHealthy
}
s.primaryNode.primary = false
s.primaryNode = newPrimary
newPrimary.primary = true
+
+ return nil
}
// GetPrimary gets the primary of a shard. If no primary exists, it will
diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go
index 16f3a3875..5bf02fca4 100644
--- a/internal/praefect/nodes/local_elector_test.go
+++ b/internal/praefect/nodes/local_elector_test.go
@@ -24,12 +24,14 @@ func TestPrimaryAndSecondaries(t *testing.T) {
require.NoError(t, err)
storageName := "default"
- mockHistogramVec := promtest.NewMockHistogramVec()
+ mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
- cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec)
- strategy := newLocalElector(storageName, true)
+ cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec0)
+ secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+ ns := []*nodeStatus{cs, secondary}
+ logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
+ strategy := newLocalElector(storageName, true, logger, ns)
- strategy.addNode(cs, true)
strategy.bootstrap(time.Second)
primary, err := strategy.GetPrimary()
@@ -40,14 +42,6 @@ func TestPrimaryAndSecondaries(t *testing.T) {
secondaries, err := strategy.GetSecondaries()
require.NoError(t, err)
- require.Equal(t, 0, len(secondaries))
-
- secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), nil)
- strategy.addNode(secondary, false)
-
- secondaries, err = strategy.GetSecondaries()
-
- require.NoError(t, err)
require.Equal(t, 1, len(secondaries))
require.Equal(t, secondary, secondaries[0])
}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index ea2e57feb..a90a0b080 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -2,6 +2,7 @@ package nodes
import (
"context"
+ "database/sql"
"errors"
"time"
@@ -46,14 +47,14 @@ type Mgr struct {
log *logrus.Entry
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
+ db *sql.DB
}
// leaderElectionStrategy defines the interface by which primary and
// secondaries are managed.
type leaderElectionStrategy interface {
- start(bootstrapInterval, monitorInterval time.Duration) error
- addNode(node Node, primary bool)
- checkNodes(context.Context)
+ start(bootstrapInterval, monitorInterval time.Duration)
+ checkNodes(context.Context) error
Shard
}
@@ -63,17 +64,11 @@ type leaderElectionStrategy interface {
var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
- mgr := Mgr{
- log: log,
- failoverEnabled: c.FailoverEnabled}
-
- mgr.strategies = make(map[string]leaderElectionStrategy)
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+ strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
for _, virtualStorage := range c.VirtualStorages {
- strategy := newLocalElector(virtualStorage.Name, c.FailoverEnabled)
- mgr.strategies[virtualStorage.Name] = strategy
-
+ ns := make([]*nodeStatus, 1, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
conn, err := client.Dial(node.Address,
append(
@@ -95,22 +90,40 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram prommetrics
if err != nil {
return nil, err
}
- ns := newConnectionStatus(*node, conn, log, latencyHistogram)
+ cs := newConnectionStatus(*node, conn, log, latencyHistogram)
+ if node.DefaultPrimary {
+ ns[0] = cs
+ } else {
+ ns = append(ns, cs)
+ }
+ }
- strategy.addNode(ns, node.DefaultPrimary)
+ if c.Failover.ElectionStrategy == "sql" {
+ strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, defaultFailoverTimeoutSeconds, defaultActivePraefectSeconds, db, log, ns)
+ } else {
+ strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, c.Failover.Enabled, log, ns)
}
}
- return &mgr, nil
+ return &Mgr{
+ log: log,
+ db: db,
+ failoverEnabled: c.Failover.Enabled,
+ strategies: strategies,
+ }, nil
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
if n.failoverEnabled {
+ n.log.Info("Starting failover checks")
+
for _, strategy := range n.strategies {
strategy.start(bootstrapInterval, monitorInterval)
}
+ } else {
+ n.log.Info("Failover checks are disabled")
}
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 185c640d6..a4f95661b 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -52,6 +52,52 @@ func TestNodeStatus(t *testing.T) {
require.False(t, status)
}
+func TestPrimaryIsSecond(t *testing.T) {
+ virtualStorages := []*config.VirtualStorage{
+ {
+ Name: "virtual-storage-0",
+ Nodes: []*models.Node{
+ {
+ Storage: "praefect-internal-0",
+ Address: "unix://socket0",
+ DefaultPrimary: false,
+ },
+ {
+ Storage: "praefect-internal-1",
+ Address: "unix://socket1",
+ DefaultPrimary: true,
+ },
+ },
+ },
+ }
+
+ conf := config.Config{
+ VirtualStorages: virtualStorages,
+ Failover: config.Failover{Enabled: false},
+ }
+
+ mockHistogram := promtest.NewMockHistogramVec()
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, mockHistogram)
+ require.NoError(t, err)
+
+ shard, err := nm.GetShard("virtual-storage-0")
+ require.NoError(t, err)
+
+ primary, err := shard.GetPrimary()
+ require.NoError(t, err)
+
+ secondaries, err := shard.GetSecondaries()
+ require.Len(t, secondaries, 1)
+ require.NoError(t, err)
+
+ require.Equal(t, virtualStorages[0].Nodes[1].Storage, primary.GetStorage())
+ require.Equal(t, virtualStorages[0].Nodes[1].Address, primary.GetAddress())
+
+ require.Len(t, secondaries, 1)
+ require.Equal(t, virtualStorages[0].Nodes[0].Storage, secondaries[0].GetStorage())
+ require.Equal(t, virtualStorages[0].Nodes[0].Address, secondaries[0].GetAddress())
+}
+
func TestNodeManager(t *testing.T) {
internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
srv0, healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0)
@@ -79,18 +125,18 @@ func TestNodeManager(t *testing.T) {
confWithFailover := config.Config{
VirtualStorages: virtualStorages,
- FailoverEnabled: true,
+ Failover: config.Failover{Enabled: true},
}
confWithoutFailover := config.Config{
VirtualStorages: virtualStorages,
- FailoverEnabled: false,
+ Failover: config.Failover{Enabled: false},
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, mockHistogram)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, mockHistogram)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go
new file mode 100644
index 000000000..babab116c
--- /dev/null
+++ b/internal/praefect/nodes/sql_elector.go
@@ -0,0 +1,456 @@
+package nodes
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "math"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+)
+
+const (
+ defaultFailoverTimeoutSeconds = 10
+ defaultActivePraefectSeconds = 60
+)
+
+type sqlCandidate struct {
+ Node
+}
+
+// sqlElector manages the primary election for one virtual storage (aka
+// shard). It enables multiple, redundant Praefect processes to run,
+// which is needed to eliminate a single point of failure in Gitaly High
+// Avaiability.
+//
+// The sqlElector is responsible for:
+//
+// 1. Monitoring and updating the status of all nodes within the shard.
+// 2. Electing a new primary of the shard based on the health.
+//
+// Every Praefect node periodically (every second) performs a health check RPC with a Gitaly node.
+// 1. For each node, Praefect updates a row in a new table
+// (`node_status`) with the following information:
+//
+// a. The name of the Praefect instance (`praefect_name`)
+// b. The name of the virtual storage name (`shard_name`)
+// c. The name of the Gitaly storage name (`storage_name`)
+// d. The timestamp of the last time Praefect tried to reach that node (`last_contact_attempt_at`)
+// e. The timestamp of the last successful health check (`last_seen_active_at`)
+//
+// 2. Once the health checks are complete, Praefect node does a `SELECT` from
+// `node_status` to determine healthy nodes. A healthy node is
+// defined by:
+// a. A node that has a recent successful error check (e.g. one in
+// the last 10 s).
+// b. A majority of the available Praefect nodes have entries that
+// match the two above.
+//
+// To determine the majority, we use a lightweight service discovery
+// protocol: a Praefect node is deemed a voting member if the
+// `praefect_name` has a recent `last_contact_attempt_at` in the
+// `node_status` table. The name is derived from a combination
+// of the hostname and listening port/socket.
+//
+// The primary of each shard is listed in the
+// `shard_primaries`. If the current primary is in the healthy
+// node list, then sqlElector updates its internal state to match.
+//
+// Otherwise, if there is no primary or it is unhealthy, any Praefect node
+// can elect a new primary by choosing candidate from the healthy node
+// list.
+type sqlElector struct {
+ m sync.RWMutex
+ praefectName string
+ shardName string
+ nodes []*sqlCandidate
+ primaryNode *sqlCandidate
+ db *sql.DB
+ log logrus.FieldLogger
+ failoverSeconds int
+ activePraefectSeconds int
+}
+
+func newSQLElector(name string, c config.Config, failoverTimeoutSeconds int, activePraefectSeconds int, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector {
+ praefectName := getPraefectName(c, log)
+
+ log = log.WithField("praefectName", praefectName)
+ log.Info("Using SQL election strategy")
+
+ nodes := make([]*sqlCandidate, len(ns))
+ for i, n := range ns {
+ nodes[i] = &sqlCandidate{Node: n}
+ }
+
+ return &sqlElector{
+ praefectName: praefectName,
+ shardName: name,
+ db: db,
+ log: log,
+ failoverSeconds: failoverTimeoutSeconds,
+ activePraefectSeconds: activePraefectSeconds,
+ nodes: nodes,
+ primaryNode: nodes[0],
+ }
+}
+
+// Generate a Praefect name so that each Praefect process can report
+// node statuses independently. This will enable us to do a SQL
+// election to determine which nodes are active. Ideally this name
+// doesn't change across restarts since that may temporarily make it
+// look like there are more Praefect processes active for
+// determining a quorum.
+func getPraefectName(c config.Config, log logrus.FieldLogger) string {
+ name, err := os.Hostname()
+
+ if err != nil {
+ name = uuid.New().String()
+ log.WithError(err).WithFields(logrus.Fields{
+ "praefectName": name,
+ }).Warn("unable to determine Praefect hostname, using randomly generated UUID")
+ }
+
+ if c.ListenAddr != "" {
+ return fmt.Sprintf("%s:%s", name, c.ListenAddr)
+ }
+
+ return fmt.Sprintf("%s:%s", name, c.SocketPath)
+}
+
+// start launches a Goroutine to check the state of the nodes and
+// continuously monitor their health via gRPC health checks.
+func (s *sqlElector) start(bootstrapInterval, monitorInterval time.Duration) {
+ s.bootstrap(bootstrapInterval)
+ go s.monitor(monitorInterval)
+}
+
+func (s *sqlElector) bootstrap(d time.Duration) {
+ ctx := context.Background()
+ s.checkNodes(ctx)
+}
+
+func (s *sqlElector) monitor(d time.Duration) {
+ ticker := time.NewTicker(d)
+ defer ticker.Stop()
+
+ ctx := context.Background()
+
+ for {
+ <-ticker.C
+ s.checkNodes(ctx)
+ }
+}
+
+func (s *sqlElector) checkNodes(ctx context.Context) error {
+ var wg sync.WaitGroup
+
+ defer s.updateMetrics()
+
+ for _, n := range s.nodes {
+ wg.Add(1)
+
+ go func(n Node) {
+ defer wg.Done()
+ result, _ := n.check(ctx)
+ if err := s.updateNode(n, result); err != nil {
+ s.log.WithError(err).WithFields(logrus.Fields{
+ "shard": s.shardName,
+ "storage": n.GetStorage(),
+ "address": n.GetAddress(),
+ }).Error("error checking node")
+ }
+ }(n)
+ }
+
+ wg.Wait()
+
+ err := s.validateAndUpdatePrimary()
+
+ if err != nil {
+ s.log.WithError(err).Error("unable to validate primary")
+ return err
+ }
+
+ // The attempt to elect a primary may have conflicted with another
+ // node attempting to elect a primary. We check the database again
+ // to see the current state.
+ candidate, err := s.lookupPrimary()
+
+ if err != nil {
+ s.log.WithError(err).Error("error looking up primary")
+ return err
+ }
+
+ s.setPrimary(candidate)
+ return nil
+}
+
+func (s *sqlElector) setPrimary(candidate *sqlCandidate) {
+ s.m.Lock()
+ defer s.m.Unlock()
+
+ if candidate != s.primaryNode {
+ var oldPrimary string
+ var newPrimary string
+
+ if s.primaryNode != nil {
+ oldPrimary = s.primaryNode.GetStorage()
+ }
+
+ if candidate != nil {
+ newPrimary = candidate.GetStorage()
+ }
+
+ s.log.WithFields(logrus.Fields{
+ "oldPrimary": oldPrimary,
+ "newPrimary": newPrimary,
+ "shard": s.shardName}).Info("primary node changed")
+
+ s.primaryNode = candidate
+ }
+}
+
+func (s *sqlElector) updateNode(node Node, result bool) error {
+ var q string
+
+ if result {
+ q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
+VALUES ($1, $2, $3, NOW(), NOW())
+ON CONFLICT (praefect_name, shard_name, node_name)
+DO UPDATE SET
+last_contact_attempt_at = NOW(),
+last_seen_active_at = NOW()`
+ } else {
+ // Omit the last_seen_active_at since we weren't successful at contacting this node
+ q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at)
+VALUES ($1, $2, $3, NOW())
+ON CONFLICT (praefect_name, shard_name, node_name)
+DO UPDATE SET
+last_contact_attempt_at = NOW()`
+ }
+
+ _, err := s.db.Exec(q, s.praefectName, s.shardName, node.GetStorage())
+
+ if err != nil {
+ s.log.Errorf("Error updating node: %s", err)
+ }
+
+ return err
+}
+
+// GetPrimary gets the primary of a shard by checking the state of the
+// database and updating the internal state. If no primary exists, it
+// will be nil. If a primary has been elected but is down, err will be
+// ErrPrimaryNotHealthy.
+func (s *sqlElector) GetPrimary() (Node, error) {
+ primary, err := s.lookupPrimary()
+
+ if err != nil {
+ return nil, err
+ }
+
+ // Update the internal state so that calls to GetSecondaries() will be
+ // consistent with GetPrimary()
+ s.setPrimary(primary)
+
+ if primary == nil {
+ return nil, ErrPrimaryNotHealthy
+ }
+
+ return primary, nil
+}
+
+// GetSecondaries gets the secondaries of a shard. It uses the internal
+// state to determine the primary so that calls to GetSecondaries() will
+// be consistent with the first call to GetPrimary().
+func (s *sqlElector) GetSecondaries() ([]Node, error) {
+ s.m.RLock()
+ primaryNode := s.primaryNode
+ s.m.RUnlock()
+
+ var secondaries []Node
+ for _, n := range s.nodes {
+ if primaryNode != n {
+ secondaries = append(secondaries, n)
+ }
+ }
+
+ return secondaries, nil
+}
+
+func (s *sqlElector) updateMetrics() {
+ s.m.RLock()
+ primary := s.primaryNode
+ s.m.RUnlock()
+
+ for _, node := range s.nodes {
+ var val float64
+
+ if primary == node {
+ val = 1
+ }
+
+ metrics.PrimaryGauge.WithLabelValues(s.shardName, node.GetStorage()).Set(val)
+ }
+}
+
+func (s *sqlElector) getQuorumCount() (int, error) {
+ // This is crude form of service discovery. Find how many active
+ // Praefect nodes based on whether they attempted to update entries.
+ q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - $2::INTERVAL SECOND`
+
+ var totalCount int
+
+ if err := s.db.QueryRow(q, s.shardName, s.activePraefectSeconds).Scan(&totalCount); err != nil {
+ return 0, fmt.Errorf("error retrieving quorum count: %v", err)
+ }
+
+ if totalCount <= 1 {
+ return 1, nil
+ }
+
+ quorumCount := int(math.Ceil(float64(totalCount) / 2))
+
+ return quorumCount, nil
+}
+
+func (s *sqlElector) lookupNodeByName(name string) *sqlCandidate {
+ for _, n := range s.nodes {
+ if n.GetStorage() == name {
+ return n
+ }
+ }
+
+ return nil
+}
+
+func nodeInSlice(candidates []*sqlCandidate, node *sqlCandidate) bool {
+ for _, n := range candidates {
+ if n == node {
+ return true
+ }
+ }
+
+ return false
+}
+
+func (s *sqlElector) demotePrimary() error {
+ s.setPrimary(nil)
+
+ q := "DELETE FROM shard_primaries WHERE shard_name = $1"
+ _, err := s.db.Exec(q, s.shardName)
+
+ return err
+}
+
+func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error {
+ // Arbitrarily pick the first candidate from the list. Note that the
+ // candidate list has already been ranked in order of highest number
+ // of Praefect nodes that have reached it and the name of the
+ // Praefect identifier.
+ newPrimary := candidates[0]
+
+ q := `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at)
+ SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW()
+ WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR), '')
+ ON CONFLICT (shard_name)
+ DO UPDATE SET elected_by_praefect = EXCLUDED.elected_by_praefect
+ , node_name = EXCLUDED.node_name
+ , elected_at = EXCLUDED.elected_at
+ `
+ _, err := s.db.Exec(q, s.praefectName, s.shardName, newPrimary.GetStorage())
+
+ if err != nil {
+ s.log.Errorf("error updating new primary: %s", err)
+ }
+
+ return nil
+}
+
+func (s *sqlElector) validateAndUpdatePrimary() error {
+ quorumCount, err := s.getQuorumCount()
+
+ if err != nil {
+ return err
+ }
+
+ // Retrieves candidates, ranked by the ones that are the most active
+ q := `SELECT node_name FROM node_status
+ WHERE shard_name = $1 AND last_seen_active_at >= NOW() - $2::INTERVAL SECOND
+ GROUP BY node_name
+ HAVING COUNT(praefect_name) >= $3
+ ORDER BY COUNT(node_name) DESC, node_name ASC`
+
+ rows, err := s.db.Query(q, s.shardName, s.failoverSeconds, quorumCount)
+
+ if err != nil {
+ return fmt.Errorf("error retrieving candidates: %v", err)
+ }
+ defer rows.Close()
+
+ var candidates []*sqlCandidate
+
+ for rows.Next() {
+ var name string
+ if err := rows.Scan(&name); err != nil {
+ return fmt.Errorf("error retrieving candidate rows: %v", err)
+ }
+
+ node := s.lookupNodeByName(name)
+
+ if node != nil {
+ candidates = append(candidates, node)
+ } else {
+ s.log.Errorf("unknown candidate node name found: %s", name)
+ }
+ }
+
+ if err = rows.Err(); err != nil {
+ return err
+ }
+
+ // Check if primary is in this list
+ primaryNode, err := s.lookupPrimary()
+
+ if err != nil {
+ s.log.WithError(err).Error("error looking up primary")
+ return err
+ }
+
+ if len(candidates) == 0 {
+ return s.demotePrimary()
+ }
+
+ if primaryNode == nil || !nodeInSlice(candidates, primaryNode) {
+ return s.electNewPrimary(candidates)
+ }
+
+ return nil
+}
+
+func (s *sqlElector) lookupPrimary() (*sqlCandidate, error) {
+ var primaryName string
+
+ q := `SELECT node_name FROM shard_primaries WHERE shard_name = $1`
+
+ if err := s.db.QueryRow(q, s.shardName).Scan(&primaryName); err != nil {
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+
+ return nil, fmt.Errorf("error looking up primary: %v", err)
+ }
+
+ var primaryNode *sqlCandidate
+ if primaryName != "" {
+ primaryNode = s.lookupNodeByName(primaryName)
+ }
+
+ return primaryNode, nil
+}
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
new file mode 100644
index 000000000..faf3d99a3
--- /dev/null
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -0,0 +1,163 @@
+// +build postgres
+
+package nodes
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var shardName string = "test-shard-0"
+
+func TestGetPrimaryAndSecondaries(t *testing.T) {
+ db := getDB(t)
+
+ logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
+ praefectSocket := testhelper.GetTemporaryGitalySocketFileName()
+ socketName := "unix://" + praefectSocket
+
+ conf := config.Config{
+ SocketPath: socketName,
+ }
+
+ internalSocket0 := testhelper.GetTemporaryGitalySocketFileName()
+ srv0, _ := testhelper.NewServerWithHealth(t, internalSocket0)
+ defer srv0.Stop()
+
+ cc0, err := grpc.Dial(
+ "unix://"+internalSocket0,
+ grpc.WithInsecure(),
+ )
+ require.NoError(t, err)
+
+ storageName := "default"
+ mockHistogramVec0 := promtest.NewMockHistogramVec()
+ cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
+
+ ns := []*nodeStatus{cs0}
+ elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns)
+ require.Contains(t, elector.praefectName, ":"+socketName)
+ require.Equal(t, elector.shardName, shardName)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ err = elector.checkNodes(ctx)
+ db.RequireRowsInTable(t, "shard_primaries", 1)
+
+ elector.demotePrimary()
+ db.RequireRowsInTable(t, "shard_primaries", 0)
+
+ // Ensure the primary state is reflected immediately
+ primary, err := elector.GetPrimary()
+ require.Error(t, err)
+ require.Equal(t, nil, primary)
+
+ secondaries, err := elector.GetSecondaries()
+ require.NoError(t, err)
+ require.Equal(t, 1, len(secondaries))
+}
+
+func TestBasicFailover(t *testing.T) {
+ db := getDB(t)
+
+ logger := testhelper.NewTestLogger(t).WithField("test", t.Name())
+ praefectSocket := testhelper.GetTemporaryGitalySocketFileName()
+ socketName := "unix://" + praefectSocket
+
+ conf := config.Config{
+ SocketPath: socketName,
+ }
+
+ internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
+ srv0, healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0)
+ defer srv0.Stop()
+
+ srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1)
+ defer srv1.Stop()
+
+ cc0, err := grpc.Dial(
+ "unix://"+internalSocket0,
+ grpc.WithInsecure(),
+ )
+ require.NoError(t, err)
+
+ cc1, err := grpc.Dial(
+ "unix://"+internalSocket1,
+ grpc.WithInsecure(),
+ )
+
+ require.NoError(t, err)
+
+ storageName := "default"
+ mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
+ cs0 := newConnectionStatus(models.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
+ cs1 := newConnectionStatus(models.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+
+ ns := []*nodeStatus{cs0, cs1}
+ elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ err = elector.checkNodes(ctx)
+
+ require.NoError(t, err)
+ db.RequireRowsInTable(t, "node_status", 2)
+ db.RequireRowsInTable(t, "shard_primaries", 1)
+
+ require.Equal(t, cs0, elector.primaryNode.Node)
+ primary, err := elector.GetPrimary()
+ require.NoError(t, err)
+ require.Equal(t, cs0.GetStorage(), primary.GetStorage())
+
+ secondaries, err := elector.GetSecondaries()
+ require.NoError(t, err)
+ require.Equal(t, 1, len(secondaries))
+ require.Equal(t, cs1.GetStorage(), secondaries[0].GetStorage())
+
+ // Bring first node down
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+
+ // Primary should remain even after the first check
+ err = elector.checkNodes(ctx)
+ require.NoError(t, err)
+ primary, err = elector.GetPrimary()
+ require.NoError(t, err)
+
+ // Wait for stale timeout to expire
+ time.Sleep(1 * time.Second)
+
+ // Expect that the other node is promoted
+ err = elector.checkNodes(ctx)
+ require.NoError(t, err)
+
+ db.RequireRowsInTable(t, "node_status", 2)
+ db.RequireRowsInTable(t, "shard_primaries", 1)
+ primary, err = elector.GetPrimary()
+ require.NoError(t, err)
+ require.Equal(t, cs1.GetStorage(), primary.GetStorage())
+
+ // Bring second node down
+ healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+
+ // Wait for stale timeout to expire
+ time.Sleep(1 * time.Second)
+ err = elector.checkNodes(ctx)
+ require.NoError(t, err)
+
+ db.RequireRowsInTable(t, "node_status", 2)
+ // No new candidates
+ db.RequireRowsInTable(t, "shard_primaries", 0)
+ primary, err = elector.GetPrimary()
+ require.Error(t, ErrPrimaryNotHealthy, err)
+ secondaries, err = elector.GetSecondaries()
+ require.NoError(t, err)
+ require.Equal(t, 2, len(secondaries))
+}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 3b45dad7b..3a03440de 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -148,7 +148,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
replicator.log = entry
- nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -225,7 +225,7 @@ func TestPropagateReplicationJob(t *testing.T) {
}
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -526,7 +526,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr("default", logEntry, ds, nodeMgr)
@@ -664,7 +664,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(conf.VirtualStorages[0].Name, logEntry, ds, nodeMgr)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index ade82de87..5dc429c80 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -134,7 +134,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
}
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
registry := protoregistry.New()