Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-06-04 22:55:49 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-06-04 22:55:49 +0300
commit54195f2378d8c38502a88f0717ad6cd0100d57a5 (patch)
treea03ef8205ade7527d28238b8b343e06629ae18b7
parent5955867b762504dccfa26166c1f41cf8a7648074 (diff)
parent74672163a36ec3b361e2e9f008071152a172e861 (diff)
Merge branch 'smh-sql-elector-test-time' into 'master'
Remove time dependency from sql elector tests See merge request gitlab-org/gitaly!2249
-rw-r--r--internal/praefect/nodes/manager.go2
-rw-r--r--internal/praefect/nodes/sql_elector.go22
-rw-r--r--internal/praefect/nodes/sql_elector_test.go54
3 files changed, 37 insertions, 41 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index defc1cd90..4b57a8868 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -136,7 +136,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
}
if c.Failover.Enabled && c.Failover.ElectionStrategy == "sql" {
- strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, defaultFailoverTimeoutSeconds, defaultActivePraefectSeconds, db, log, ns)
+ strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, db, log, ns)
} else {
strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, c.Failover.Enabled, c.Failover.ReadOnlyAfterFailover, log, ns)
}
diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go
index e747b5265..07b933e1f 100644
--- a/internal/praefect/nodes/sql_elector.go
+++ b/internal/praefect/nodes/sql_elector.go
@@ -18,8 +18,8 @@ import (
)
const (
- defaultFailoverTimeoutSeconds = 10
- defaultActivePraefectSeconds = 60
+ failoverTimeout = 10 * time.Second
+ activePraefectTimeout = 60 * time.Second
)
type sqlCandidate struct {
@@ -81,12 +81,10 @@ type sqlElector struct {
primaryNode *sqlCandidate
db *sql.DB
log logrus.FieldLogger
- failoverSeconds int
- activePraefectSeconds int
readOnlyAfterFailover bool
}
-func newSQLElector(name string, c config.Config, failoverTimeoutSeconds int, activePraefectSeconds int, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector {
+func newSQLElector(name string, c config.Config, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector {
praefectName := getPraefectName(c, log)
log = log.WithField("praefectName", praefectName)
@@ -102,8 +100,6 @@ func newSQLElector(name string, c config.Config, failoverTimeoutSeconds int, act
shardName: name,
db: db,
log: log,
- failoverSeconds: failoverTimeoutSeconds,
- activePraefectSeconds: activePraefectSeconds,
nodes: nodes,
primaryNode: nodes[0],
readOnlyAfterFailover: c.Failover.ReadOnlyAfterFailover,
@@ -299,11 +295,11 @@ func (s *sqlElector) updateMetrics() {
func (s *sqlElector) getQuorumCount() (int, error) {
// This is crude form of service discovery. Find how many active
// Praefect nodes based on whether they attempted to update entries.
- q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - $2::INTERVAL SECOND`
+ q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $2`
var totalCount int
- if err := s.db.QueryRow(q, s.shardName, s.activePraefectSeconds).Scan(&totalCount); err != nil {
+ if err := s.db.QueryRow(q, s.shardName, activePraefectTimeout.Microseconds()).Scan(&totalCount); err != nil {
return 0, fmt.Errorf("error retrieving quorum count: %w", err)
}
@@ -427,9 +423,9 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error {
, elected_at = EXCLUDED.elected_at
, read_only = $5
, demoted = false
- WHERE shard_primaries.elected_at < now() - $4::INTERVAL SECOND
+ WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4
`
- _, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, s.failoverSeconds, s.readOnlyAfterFailover)
+ _, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, failoverTimeout.Microseconds(), s.readOnlyAfterFailover)
if err != nil {
s.log.Errorf("error updating new primary: %s", err)
@@ -461,12 +457,12 @@ func (s *sqlElector) validateAndUpdatePrimary() error {
// Retrieves candidates, ranked by the ones that are the most active
q := `SELECT node_name FROM node_status
- WHERE shard_name = $1 AND last_seen_active_at >= NOW() - $2::INTERVAL SECOND
+ WHERE shard_name = $1 AND last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $2
GROUP BY node_name
HAVING COUNT(praefect_name) >= $3
ORDER BY COUNT(node_name) DESC, node_name ASC`
- rows, err := s.db.Query(q, s.shardName, s.failoverSeconds, quorumCount)
+ rows, err := s.db.Query(q, s.shardName, failoverTimeout.Microseconds(), quorumCount)
if err != nil {
return fmt.Errorf("error retrieving candidates: %w", err)
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index 6d4be1bd5..f6a459334 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -45,7 +45,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) {
cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
ns := []*nodeStatus{cs0}
- elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns)
+ elector := newSQLElector(shardName, conf, db.DB, logger, ns)
require.Contains(t, elector.praefectName, ":"+socketName)
require.Equal(t, elector.shardName, shardName)
@@ -101,8 +101,7 @@ func TestBasicFailover(t *testing.T) {
cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1)
ns := []*nodeStatus{cs0, cs1}
- failoverTimeSeconds := 1
- elector := newSQLElector(shardName, conf, failoverTimeSeconds, defaultActivePraefectSeconds, db.DB, logger, ns)
+ elector := newSQLElector(shardName, conf, db.DB, logger, ns)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -122,23 +121,17 @@ func TestBasicFailover(t *testing.T) {
// Bring first node down
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+ predateElection(t, db, shardName, failoverTimeout)
- // pretend like the last election happened in the past because the query in electNewPrimary to update the primary will not
- // overwrite a previous that's within 10 seconds
- _, err = db.Exec(`UPDATE shard_primaries SET elected_at = now() - $1::INTERVAL SECOND WHERE shard_name = $2`,
- 2*failoverTimeSeconds,
- shardName)
- require.NoError(t, err)
-
- // Primary should remain even after the first check
+ // Primary should remain before the failover timeout is exceeded
err = elector.checkNodes(ctx)
require.NoError(t, err)
shard, err = elector.GetShard()
require.NoError(t, err)
require.False(t, shard.IsReadOnly)
- // Wait for stale timeout to expire
- time.Sleep(1 * time.Second)
+ // Predate the timeout to exceed it
+ predateLastSeenActiveAt(t, db, shardName, cs0.GetStorage(), failoverTimeout)
// Expect that the other node is promoted
err = elector.checkNodes(ctx)
@@ -160,11 +153,8 @@ func TestBasicFailover(t *testing.T) {
// Bring second node down
healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
- // Wait for stale timeout to expire
- time.Sleep(1 * time.Second)
err = elector.checkNodes(ctx)
require.NoError(t, err)
-
db.RequireRowsInTable(t, "node_status", 2)
// No new candidates
_, err = elector.GetShard()
@@ -180,8 +170,6 @@ func TestElectDemotedPrimary(t *testing.T) {
elector := newSQLElector(
shardName,
config.Config{},
- defaultFailoverTimeoutSeconds,
- defaultActivePraefectSeconds,
db.DB,
testhelper.DiscardTestLogger(t),
[]*nodeStatus{{Node: node}},
@@ -200,7 +188,7 @@ func TestElectDemotedPrimary(t *testing.T) {
require.NoError(t, err)
require.Nil(t, primary)
- shiftElection(t, db, shardName, -1*defaultFailoverTimeoutSeconds)
+ predateElection(t, db, shardName, failoverTimeout)
require.NoError(t, err)
require.NoError(t, elector.electNewPrimary(candidates))
@@ -209,14 +197,27 @@ func TestElectDemotedPrimary(t *testing.T) {
require.Equal(t, node.Storage, primary.GetStorage())
}
-// pretend like the last election happened in the past because the query in electNewPrimary
-// to update the primary will not overwrite a previous that's within 10 seconds
-func shiftElection(t testing.TB, db glsql.DB, shardName string, seconds int) {
+// predateLastSeenActiveAt shifts the last_seen_active_at column to an earlier time. This avoids
+// waiting for the node's status to become unhealthy.
+func predateLastSeenActiveAt(t testing.TB, db glsql.DB, shardName, nodeName string, amount time.Duration) {
+ t.Helper()
+
+ _, err := db.Exec(`
+UPDATE node_status SET last_seen_active_at = last_seen_active_at - INTERVAL '1 MICROSECOND' * $1
+WHERE shard_name = $2 AND node_name = $3`, amount.Microseconds(), shardName, nodeName,
+ )
+
+ require.NoError(t, err)
+}
+
+// predateElection shifts the election to an earlier time. This avoids waiting for the failover timeout to trigger
+// a new election.
+func predateElection(t testing.TB, db glsql.DB, shardName string, amount time.Duration) {
t.Helper()
_, err := db.Exec(
- "UPDATE shard_primaries SET elected_at = elected_at + $1::INTERVAL SECOND WHERE shard_name = $2",
- seconds,
+ "UPDATE shard_primaries SET elected_at = elected_at - INTERVAL '1 MICROSECOND' * $1 WHERE shard_name = $2",
+ amount.Microseconds(),
shardName,
)
require.NoError(t, err)
@@ -242,7 +243,6 @@ func TestElectNewPrimary(t *testing.T) {
},
}}
- failoverTimeSeconds := 1
candidates := []*sqlCandidate{
{
&nodeStatus{
@@ -341,7 +341,7 @@ func TestElectNewPrimary(t *testing.T) {
db.TruncateAll(t)
conf := config.Config{Failover: config.Failover{ReadOnlyAfterFailover: true}}
- elector := newSQLElector(shardName, conf, failoverTimeSeconds, defaultActivePraefectSeconds, db.DB, testhelper.DiscardTestLogger(t), ns)
+ elector := newSQLElector(shardName, conf, db.DB, testhelper.DiscardTestLogger(t), ns)
require.NoError(t, elector.electNewPrimary(candidates))
primary, readOnly, err := elector.lookupPrimary()
@@ -349,7 +349,7 @@ func TestElectNewPrimary(t *testing.T) {
require.Equal(t, "gitaly-1", primary.GetStorage(), "since replication queue is empty the first candidate should be chosen")
require.False(t, readOnly)
- shiftElection(t, db, shardName, -1*failoverTimeSeconds)
+ predateElection(t, db, shardName, failoverTimeout)
_, err = db.Exec(testCase.initialReplQueueInsert)
require.NoError(t, err)