diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-06-04 22:55:49 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-06-04 22:55:49 +0300 |
commit | 54195f2378d8c38502a88f0717ad6cd0100d57a5 (patch) | |
tree | a03ef8205ade7527d28238b8b343e06629ae18b7 | |
parent | 5955867b762504dccfa26166c1f41cf8a7648074 (diff) | |
parent | 74672163a36ec3b361e2e9f008071152a172e861 (diff) |
Merge branch 'smh-sql-elector-test-time' into 'master'
Remove time dependency from sql elector tests
See merge request gitlab-org/gitaly!2249
-rw-r--r-- | internal/praefect/nodes/manager.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector.go | 22 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector_test.go | 54 |
3 files changed, 37 insertions, 41 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index defc1cd90..4b57a8868 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -136,7 +136,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore. } if c.Failover.Enabled && c.Failover.ElectionStrategy == "sql" { - strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, defaultFailoverTimeoutSeconds, defaultActivePraefectSeconds, db, log, ns) + strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, db, log, ns) } else { strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, c.Failover.Enabled, c.Failover.ReadOnlyAfterFailover, log, ns) } diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go index e747b5265..07b933e1f 100644 --- a/internal/praefect/nodes/sql_elector.go +++ b/internal/praefect/nodes/sql_elector.go @@ -18,8 +18,8 @@ import ( ) const ( - defaultFailoverTimeoutSeconds = 10 - defaultActivePraefectSeconds = 60 + failoverTimeout = 10 * time.Second + activePraefectTimeout = 60 * time.Second ) type sqlCandidate struct { @@ -81,12 +81,10 @@ type sqlElector struct { primaryNode *sqlCandidate db *sql.DB log logrus.FieldLogger - failoverSeconds int - activePraefectSeconds int readOnlyAfterFailover bool } -func newSQLElector(name string, c config.Config, failoverTimeoutSeconds int, activePraefectSeconds int, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector { +func newSQLElector(name string, c config.Config, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector { praefectName := getPraefectName(c, log) log = log.WithField("praefectName", praefectName) @@ -102,8 +100,6 @@ func newSQLElector(name string, c config.Config, failoverTimeoutSeconds int, act shardName: name, db: db, log: log, - failoverSeconds: failoverTimeoutSeconds, - activePraefectSeconds: activePraefectSeconds, nodes: nodes, primaryNode: nodes[0], readOnlyAfterFailover: c.Failover.ReadOnlyAfterFailover, @@ -299,11 +295,11 @@ func (s *sqlElector) updateMetrics() { func (s *sqlElector) getQuorumCount() (int, error) { // This is crude form of service discovery. Find how many active // Praefect nodes based on whether they attempted to update entries. - q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - $2::INTERVAL SECOND` + q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $2` var totalCount int - if err := s.db.QueryRow(q, s.shardName, s.activePraefectSeconds).Scan(&totalCount); err != nil { + if err := s.db.QueryRow(q, s.shardName, activePraefectTimeout.Microseconds()).Scan(&totalCount); err != nil { return 0, fmt.Errorf("error retrieving quorum count: %w", err) } @@ -427,9 +423,9 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error { , elected_at = EXCLUDED.elected_at , read_only = $5 , demoted = false - WHERE shard_primaries.elected_at < now() - $4::INTERVAL SECOND + WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4 ` - _, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, s.failoverSeconds, s.readOnlyAfterFailover) + _, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, failoverTimeout.Microseconds(), s.readOnlyAfterFailover) if err != nil { s.log.Errorf("error updating new primary: %s", err) @@ -461,12 +457,12 @@ func (s *sqlElector) validateAndUpdatePrimary() error { // Retrieves candidates, ranked by the ones that are the most active q := `SELECT node_name FROM node_status - WHERE shard_name = $1 AND last_seen_active_at >= NOW() - $2::INTERVAL SECOND + WHERE shard_name = $1 AND last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $2 GROUP BY node_name HAVING COUNT(praefect_name) >= $3 ORDER BY COUNT(node_name) DESC, node_name ASC` - rows, err := s.db.Query(q, s.shardName, s.failoverSeconds, quorumCount) + rows, err := s.db.Query(q, s.shardName, failoverTimeout.Microseconds(), quorumCount) if err != nil { return fmt.Errorf("error retrieving candidates: %w", err) diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index 6d4be1bd5..f6a459334 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -45,7 +45,7 @@ func TestGetPrimaryAndSecondaries(t *testing.T) { cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0) ns := []*nodeStatus{cs0} - elector := newSQLElector(shardName, conf, 1, defaultActivePraefectSeconds, db.DB, logger, ns) + elector := newSQLElector(shardName, conf, db.DB, logger, ns) require.Contains(t, elector.praefectName, ":"+socketName) require.Equal(t, elector.shardName, shardName) @@ -101,8 +101,7 @@ func TestBasicFailover(t *testing.T) { cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1) ns := []*nodeStatus{cs0, cs1} - failoverTimeSeconds := 1 - elector := newSQLElector(shardName, conf, failoverTimeSeconds, defaultActivePraefectSeconds, db.DB, logger, ns) + elector := newSQLElector(shardName, conf, db.DB, logger, ns) ctx, cancel := testhelper.Context() defer cancel() @@ -122,23 +121,17 @@ func TestBasicFailover(t *testing.T) { // Bring first node down healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) + predateElection(t, db, shardName, failoverTimeout) - // pretend like the last election happened in the past because the query in electNewPrimary to update the primary will not - // overwrite a previous that's within 10 seconds - _, err = db.Exec(`UPDATE shard_primaries SET elected_at = now() - $1::INTERVAL SECOND WHERE shard_name = $2`, - 2*failoverTimeSeconds, - shardName) - require.NoError(t, err) - - // Primary should remain even after the first check + // Primary should remain before the failover timeout is exceeded err = elector.checkNodes(ctx) require.NoError(t, err) shard, err = elector.GetShard() require.NoError(t, err) require.False(t, shard.IsReadOnly) - // Wait for stale timeout to expire - time.Sleep(1 * time.Second) + // Predate the timeout to exceed it + predateLastSeenActiveAt(t, db, shardName, cs0.GetStorage(), failoverTimeout) // Expect that the other node is promoted err = elector.checkNodes(ctx) @@ -160,11 +153,8 @@ func TestBasicFailover(t *testing.T) { // Bring second node down healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) - // Wait for stale timeout to expire - time.Sleep(1 * time.Second) err = elector.checkNodes(ctx) require.NoError(t, err) - db.RequireRowsInTable(t, "node_status", 2) // No new candidates _, err = elector.GetShard() @@ -180,8 +170,6 @@ func TestElectDemotedPrimary(t *testing.T) { elector := newSQLElector( shardName, config.Config{}, - defaultFailoverTimeoutSeconds, - defaultActivePraefectSeconds, db.DB, testhelper.DiscardTestLogger(t), []*nodeStatus{{Node: node}}, @@ -200,7 +188,7 @@ func TestElectDemotedPrimary(t *testing.T) { require.NoError(t, err) require.Nil(t, primary) - shiftElection(t, db, shardName, -1*defaultFailoverTimeoutSeconds) + predateElection(t, db, shardName, failoverTimeout) require.NoError(t, err) require.NoError(t, elector.electNewPrimary(candidates)) @@ -209,14 +197,27 @@ func TestElectDemotedPrimary(t *testing.T) { require.Equal(t, node.Storage, primary.GetStorage()) } -// pretend like the last election happened in the past because the query in electNewPrimary -// to update the primary will not overwrite a previous that's within 10 seconds -func shiftElection(t testing.TB, db glsql.DB, shardName string, seconds int) { +// predateLastSeenActiveAt shifts the last_seen_active_at column to an earlier time. This avoids +// waiting for the node's status to become unhealthy. +func predateLastSeenActiveAt(t testing.TB, db glsql.DB, shardName, nodeName string, amount time.Duration) { + t.Helper() + + _, err := db.Exec(` +UPDATE node_status SET last_seen_active_at = last_seen_active_at - INTERVAL '1 MICROSECOND' * $1 +WHERE shard_name = $2 AND node_name = $3`, amount.Microseconds(), shardName, nodeName, + ) + + require.NoError(t, err) +} + +// predateElection shifts the election to an earlier time. This avoids waiting for the failover timeout to trigger +// a new election. +func predateElection(t testing.TB, db glsql.DB, shardName string, amount time.Duration) { t.Helper() _, err := db.Exec( - "UPDATE shard_primaries SET elected_at = elected_at + $1::INTERVAL SECOND WHERE shard_name = $2", - seconds, + "UPDATE shard_primaries SET elected_at = elected_at - INTERVAL '1 MICROSECOND' * $1 WHERE shard_name = $2", + amount.Microseconds(), shardName, ) require.NoError(t, err) @@ -242,7 +243,6 @@ func TestElectNewPrimary(t *testing.T) { }, }} - failoverTimeSeconds := 1 candidates := []*sqlCandidate{ { &nodeStatus{ @@ -341,7 +341,7 @@ func TestElectNewPrimary(t *testing.T) { db.TruncateAll(t) conf := config.Config{Failover: config.Failover{ReadOnlyAfterFailover: true}} - elector := newSQLElector(shardName, conf, failoverTimeSeconds, defaultActivePraefectSeconds, db.DB, testhelper.DiscardTestLogger(t), ns) + elector := newSQLElector(shardName, conf, db.DB, testhelper.DiscardTestLogger(t), ns) require.NoError(t, elector.electNewPrimary(candidates)) primary, readOnly, err := elector.lookupPrimary() @@ -349,7 +349,7 @@ func TestElectNewPrimary(t *testing.T) { require.Equal(t, "gitaly-1", primary.GetStorage(), "since replication queue is empty the first candidate should be chosen") require.False(t, readOnly) - shiftElection(t, db, shardName, -1*failoverTimeSeconds) + predateElection(t, db, shardName, failoverTimeout) _, err = db.Exec(testCase.initialReplQueueInsert) require.NoError(t, err) |