Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-17 19:56:03 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-26 12:45:55 +0300
commit4dfeec9150239893f1de8a94d972af8624542e2c (patch)
treea6fd35e29edb31820d9dc7747ffd982c2a52cb50
parent713bcd2cf4aad57750911cfee60db136a46e4b0c (diff)
sql: Provide timestamp values as argument to queries
We don't have a standard way of using timestamps in SQL statements. We also use a time interval subtraction technic that is pretty fragile. With that change we extract time manipulation from SQL statements into Go code and pass already prepared timestamps as parameters for the queries. This also helps to solve problem of inability to get different timestamps if SQL statements executed in single transaction. It would be needed in the upcoming changes.
-rw-r--r--internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go44
-rw-r--r--internal/praefect/datastore/queue.go24
-rw-r--r--internal/praefect/datastore/storage_cleanup.go17
-rw-r--r--internal/praefect/nodes/health_manager.go6
-rw-r--r--internal/praefect/nodes/sql_elector.go36
-rw-r--r--internal/testhelper/db.go10
6 files changed, 94 insertions, 43 deletions
diff --git a/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go
new file mode 100644
index 000000000..fe9fd65b5
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20211018081858_healthy_storages_view_time.go
@@ -0,0 +1,44 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20211018081858_healthy_storages_view_time",
+ Up: []string{
+ // Re-create view to update usage of the timestamps.
+ // We should always rely on the UTC time zone to make timestamps consistent
+ // throughout the schema.
+ `
+CREATE OR REPLACE VIEW healthy_storages AS
+ SELECT shard_name AS virtual_storage, node_name AS storage
+ FROM node_status AS ns
+ WHERE last_seen_active_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '10 SECOND'
+ GROUP BY shard_name, node_name
+ HAVING COUNT(praefect_name) >= (
+ SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count
+ FROM node_status
+ WHERE shard_name = ns.shard_name
+ AND last_contact_attempt_at >= CLOCK_TIMESTAMP() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND'
+ )
+ ORDER BY shard_name, node_name`,
+ },
+ Down: []string{
+ `
+CREATE OR REPLACE VIEW healthy_storages AS
+ SELECT shard_name AS virtual_storage, node_name AS storage
+ FROM node_status AS ns
+ WHERE last_seen_active_at >= NOW() - INTERVAL '10 SECOND'
+ GROUP BY shard_name, node_name
+ HAVING COUNT(praefect_name) >= (
+ SELECT CEIL(COUNT(DISTINCT praefect_name) / 2.0) AS quorum_count
+ FROM node_status
+ WHERE shard_name = ns.shard_name
+ AND last_contact_attempt_at >= NOW() - INTERVAL '60 SECOND'
+ )
+ ORDER BY shard_name, node_name`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index e00affd9d..601fcaa50 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -215,12 +215,12 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
RETURNING id
)
- INSERT INTO replication_queue(lock_id, job, meta)
- SELECT insert_lock.id, $4, $5
+ INSERT INTO replication_queue(lock_id, job, meta, created_at)
+ SELECT insert_lock.id, $4, $5, $6
FROM insert_lock
RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta`
// this will always return a single row result (because of lock uniqueness) or an error
- rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
+ rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta, time.Now().UTC())
if err != nil {
return ReplicationEvent{}, fmt.Errorf("query: %w", err)
}
@@ -280,14 +280,14 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
UPDATE replication_queue AS queue
SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END
, state = 'in_progress'
- , updated_at = NOW() AT TIME ZONE 'UTC'
+ , updated_at = $4
FROM candidate
WHERE queue.id = candidate.id
RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta
)
, track_job_lock AS (
INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at)
- SELECT job.id, job.lock_id, NOW() AT TIME ZONE 'UTC'
+ SELECT job.id, job.lock_id, $4
FROM job
RETURNING lock_id
)
@@ -300,7 +300,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta
FROM job
ORDER BY id`
- rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count)
+ rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count, time.Now().UTC())
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
@@ -379,7 +379,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
, updated AS (
UPDATE replication_queue AS queue
SET state = $2::REPLICATION_JOB_STATE,
- updated_at = NOW() AT TIME ZONE 'UTC'
+ updated_at = $3
FROM existing
WHERE existing.id = queue.id
RETURNING queue.id, queue.lock_id
@@ -406,7 +406,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
)
SELECT id
FROM existing`
- rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state)
+ rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state, time.Now().UTC())
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
@@ -438,7 +438,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t
query := `
UPDATE replication_queue_job_lock
- SET triggered_at = NOW() AT TIME ZONE 'UTC'
+ SET triggered_at = $3
WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))`
for {
@@ -446,7 +446,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t
case <-ctx.Done():
return nil
case <-trigger:
- res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs)
+ res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs, time.Now().UTC())
if err != nil {
if pqError, ok := err.(*pq.Error); ok && pqError.Code.Name() == "query_canceled" {
return nil
@@ -479,7 +479,7 @@ func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, t
func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
query := `
WITH stale_job_lock AS (
- DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() AT TIME ZONE 'UTC' - INTERVAL '1 MILLISECOND' * $1
+ DELETE FROM replication_queue_job_lock WHERE triggered_at < $1
RETURNING job_id, lock_id
)
, update_job AS (
@@ -505,7 +505,7 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st
GROUP BY lock_id
) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
)`
- _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds())
+ _, err := rq.qc.ExecContext(ctx, query, time.Now().UTC().Add(-staleAfter))
if err != nil {
return fmt.Errorf("exec acknowledge stale: %w", err)
}
diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go
index adc43401b..2041828b1 100644
--- a/internal/praefect/datastore/storage_cleanup.go
+++ b/internal/praefect/datastore/storage_cleanup.go
@@ -68,22 +68,23 @@ func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage
// acquired storage. It updates last_run column of the entry on execution.
func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) {
var entry ClusterPath
+ now := time.Now().UTC()
if err := ss.db.QueryRowContext(
ctx,
`UPDATE storage_cleanups
- SET triggered_at = NOW()
+ SET triggered_at = $3
WHERE (virtual_storage, storage) IN (
SELECT virtual_storage, storage
FROM storage_cleanups
WHERE
- COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1)
- AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2)
+ COALESCE(last_run, TO_TIMESTAMP(0)) <= $1
+ AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= $2
ORDER BY last_run NULLS FIRST, virtual_storage, storage
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING virtual_storage, storage`,
- inactive.Milliseconds(), updatePeriod.Milliseconds(),
+ now.Add(-inactive), now.Add(-updatePeriod), now,
).Scan(&entry.VirtualStorage, &entry.Storage); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, nil, fmt.Errorf("scan: %w", err)
@@ -111,9 +112,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda
if _, err := ss.db.ExecContext(
ctx,
`UPDATE storage_cleanups
- SET triggered_at = NOW()
+ SET triggered_at = $3
WHERE virtual_storage = $1 AND storage = $2`,
- entry.VirtualStorage, entry.Storage,
+ entry.VirtualStorage, entry.Storage, time.Now().UTC(),
); err != nil {
return
}
@@ -131,9 +132,9 @@ func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, upda
if _, err := ss.db.ExecContext(
ctx,
`UPDATE storage_cleanups
- SET last_run = NOW(), triggered_at = NULL
+ SET last_run = $3, triggered_at = NULL
WHERE virtual_storage = $1 AND storage = $2`,
- entry.VirtualStorage, entry.Storage,
+ entry.VirtualStorage, entry.Storage, time.Now().UTC(),
); err != nil {
return fmt.Errorf("update storage_cleanups: %w", err)
}
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
index 502f8ce61..d05854158 100644
--- a/internal/praefect/nodes/health_manager.go
+++ b/internal/praefect/nodes/health_manager.go
@@ -140,9 +140,10 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context) error {
hm.locallyHealthy.Store(locallyHealthy)
+ now := time.Now().UTC()
if _, err := hm.db.ExecContext(ctx, `
INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
-SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END
+SELECT $1, shard_name, node_name, $5::TIMESTAMP, CASE WHEN is_healthy THEN $5::TIMESTAMP ELSE NULL END
FROM (
SELECT unnest($2::text[]) AS shard_name,
unnest($3::text[]) AS node_name,
@@ -150,13 +151,14 @@ FROM (
) AS results
ON CONFLICT (praefect_name, shard_name, node_name)
DO UPDATE SET
- last_contact_attempt_at = NOW(),
+ last_contact_attempt_at = $5,
last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at)
`,
hm.praefectName,
pq.StringArray(virtualStorages),
pq.StringArray(physicalStorages),
pq.BoolArray(healthy),
+ now,
); err != nil {
return fmt.Errorf("update checks: %w", err)
}
diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go
index a0007c855..75df348fd 100644
--- a/internal/praefect/nodes/sql_elector.go
+++ b/internal/praefect/nodes/sql_elector.go
@@ -181,13 +181,14 @@ func (s *sqlElector) checkNodes(ctx context.Context) error {
defer s.updateMetrics()
+ timestamp := time.Now().UTC()
for _, n := range s.nodes {
wg.Add(1)
go func(n Node) {
defer wg.Done()
result, _ := n.CheckHealth(ctx)
- if err := s.updateNode(ctx, tx, n, result); err != nil {
+ if err := s.updateNode(ctx, tx, n, result, timestamp); err != nil {
s.log.WithError(err).WithFields(logrus.Fields{
"storage": n.GetStorage(),
"address": n.GetAddress(),
@@ -198,7 +199,7 @@ func (s *sqlElector) checkNodes(ctx context.Context) error {
wg.Wait()
- err = s.validateAndUpdatePrimary(ctx, tx)
+ err = s.validateAndUpdatePrimary(ctx, tx, timestamp)
if err != nil {
s.log.WithError(err).Error("unable to validate primary")
@@ -243,26 +244,26 @@ func (s *sqlElector) setPrimary(candidate *sqlCandidate) {
}
}
-func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool) error {
+func (s *sqlElector) updateNode(ctx context.Context, tx *sql.Tx, node Node, result bool, timestamp time.Time) error {
var q string
if result {
q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
-VALUES ($1, $2, $3, NOW(), NOW())
+VALUES ($1, $2, $3, $4, $4)
ON CONFLICT (praefect_name, shard_name, node_name)
DO UPDATE SET
-last_contact_attempt_at = NOW(),
-last_seen_active_at = NOW()`
+last_contact_attempt_at = $4,
+last_seen_active_at = $4`
} else {
// Omit the last_seen_active_at since we weren't successful at contacting this node
q = `INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at)
-VALUES ($1, $2, $3, NOW())
+VALUES ($1, $2, $3, $4)
ON CONFLICT (praefect_name, shard_name, node_name)
DO UPDATE SET
-last_contact_attempt_at = NOW()`
+last_contact_attempt_at = $4`
}
- _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage())
+ _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, node.GetStorage(), timestamp)
if err != nil {
s.log.Errorf("Error updating node: %s", err)
}
@@ -315,11 +316,11 @@ func (s *sqlElector) updateMetrics() {
func (s *sqlElector) getQuorumCount(ctx context.Context, tx *sql.Tx) (int, error) {
// This is crude form of service discovery. Find how many active
// Praefect nodes based on whether they attempted to update entries.
- q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= NOW() - INTERVAL '1 MICROSECOND' * $2`
+ q := `SELECT COUNT (DISTINCT praefect_name) FROM node_status WHERE shard_name = $1 AND last_contact_attempt_at >= $2`
var totalCount int
- if err := tx.QueryRowContext(ctx, q, s.shardName, activePraefectTimeout.Microseconds()).Scan(&totalCount); err != nil {
+ if err := tx.QueryRowContext(ctx, q, s.shardName, time.Now().UTC().Add(-activePraefectTimeout)).Scan(&totalCount); err != nil {
return 0, fmt.Errorf("error retrieving quorum count: %w", err)
}
@@ -427,16 +428,17 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates
// N1 (RW) -> N2 (RO) -> N3 (RW): `previous_writable_primary` is N1 as N2 was not write-enabled before the second
// failover and thus any data missing from N3 must be on N1.
q = `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at)
- SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW()
+ SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, $5
WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR AND demoted = false), '')
ON CONFLICT (shard_name)
DO UPDATE SET elected_by_praefect = EXCLUDED.elected_by_praefect
, node_name = EXCLUDED.node_name
, elected_at = EXCLUDED.elected_at
, demoted = false
- WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4
+ WHERE shard_primaries.elected_at < $4
`
- _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, s.failoverTimeout.Microseconds())
+ now := time.Now().UTC()
+ _, err := tx.ExecContext(ctx, q, s.praefectName, s.shardName, newPrimaryStorage, now.Add(-s.failoverTimeout), now)
if err != nil {
s.log.Errorf("error updating new primary: %s", err)
return err
@@ -445,7 +447,7 @@ func (s *sqlElector) electNewPrimary(ctx context.Context, tx *sql.Tx, candidates
return nil
}
-func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) error {
+func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx, timestamp time.Time) error {
quorumCount, err := s.getQuorumCount(ctx, tx)
if err != nil {
return err
@@ -453,12 +455,12 @@ func (s *sqlElector) validateAndUpdatePrimary(ctx context.Context, tx *sql.Tx) e
// Retrieves candidates, ranked by the ones that are the most active
q := `SELECT node_name FROM node_status
- WHERE shard_name = $1 AND last_seen_active_at >= NOW() - INTERVAL '1 MICROSECOND' * $2
+ WHERE shard_name = $1 AND last_seen_active_at >= $2
GROUP BY node_name
HAVING COUNT(praefect_name) >= $3
ORDER BY COUNT(node_name) DESC, node_name ASC`
- rows, err := tx.QueryContext(ctx, q, s.shardName, s.failoverTimeout.Microseconds(), quorumCount)
+ rows, err := tx.QueryContext(ctx, q, s.shardName, timestamp.Add(-s.failoverTimeout), quorumCount)
if err != nil {
return fmt.Errorf("error retrieving candidates: %w", err)
}
diff --git a/internal/testhelper/db.go b/internal/testhelper/db.go
index 130e39950..a49cfd49e 100644
--- a/internal/testhelper/db.go
+++ b/internal/testhelper/db.go
@@ -3,6 +3,7 @@ package testhelper
import (
"context"
"testing"
+ "time"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
@@ -37,15 +38,16 @@ SELECT
unnest($1::text[]) AS praefect_name,
unnest($2::text[]) AS shard_name,
unnest($3::text[]) AS node_name,
- NOW() AS last_contact_attempt_at,
- NOW() AS last_seen_active_at
+ $4 AS last_contact_attempt_at,
+ $4 AS last_seen_active_at
ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
- last_contact_attempt_at = NOW(),
- last_seen_active_at = NOW()
+ last_contact_attempt_at = $4,
+ last_seen_active_at = $4
`,
pq.StringArray(praefects),
pq.StringArray(virtualStorages),
pq.StringArray(storages),
+ time.Now().UTC(),
)
require.NoError(t, err)
}