Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2022-02-16 19:09:18 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2022-02-23 13:36:05 +0300
commita269dcdac7caf7282f2f10a21eea33a1d8258dd0 (patch)
tree4b283f515e557a948c8260a3a08b44a6b00d35bb
parent3cb0c82f8f75aefc5dffef98f8b86404d9de3db7 (diff)
datastore: Use new columns instead of JBONB job column
As we flattened job column of JSONB type to a set of simple columns we deprecate usage of the job column and replaces it with references on the corresponding columns of the replication_queue table for PostgresReplicationEventQueue. The data is retrieved from the columns on the scan operation. The tests are fixed to include repository_id into replication jobs.
-rw-r--r--internal/praefect/datastore/queue.go88
-rw-r--r--internal/praefect/datastore/queue_test.go13
2 files changed, 48 insertions, 53 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 986564b18..65948426a 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -101,50 +101,8 @@ type ReplicationEvent struct {
Meta Params
}
-// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
-func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) {
- var mapping []interface{}
- for _, column := range columns {
- switch column {
- case "id":
- mapping = append(mapping, &event.ID)
- case "state":
- mapping = append(mapping, &event.State)
- case "created_at":
- mapping = append(mapping, &event.CreatedAt)
- case "updated_at":
- mapping = append(mapping, &event.UpdatedAt)
- case "attempt":
- mapping = append(mapping, &event.Attempt)
- case "lock_id":
- mapping = append(mapping, &event.LockID)
- case "job":
- mapping = append(mapping, &event.Job)
- case "meta":
- mapping = append(mapping, &event.Meta)
- default:
- return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column)
- }
- }
- return mapping, nil
-}
-
-// Scan fills receive fields with values fetched from database based on the set of columns/column aliases.
-func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error {
- mappings, err := event.Mapping(columns)
- if err != nil {
- return err
- }
- return rows.Scan(mappings...)
-}
-
// scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases.
func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) {
- columns, err := rows.Columns()
- if err != nil {
- return events, err
- }
-
defer func() {
if cErr := rows.Close(); cErr != nil && err == nil {
err = cErr
@@ -153,9 +111,29 @@ func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error
for rows.Next() {
var event ReplicationEvent
- if err = event.Scan(columns, rows); err != nil {
+ var srcNodeStorage sql.NullString
+ if err = rows.Scan(
+ &event.ID,
+ &event.State,
+ &event.CreatedAt,
+ &event.UpdatedAt,
+ &event.LockID,
+ &event.Attempt,
+ &event.Meta,
+ &event.Job.Change,
+ &event.Job.RepositoryID,
+ &event.Job.ReplicaPath,
+ &event.Job.RelativePath,
+ &event.Job.TargetNodeStorage,
+ &srcNodeStorage,
+ &event.Job.VirtualStorage,
+ &event.Job.Params,
+ ); err != nil {
return events, err
}
+ if srcNodeStorage.Valid {
+ event.Job.SourceNodeStorage = srcNodeStorage.String
+ }
events = append(events, event)
}
@@ -226,7 +204,9 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
INSERT INTO replication_queue(lock_id, job, meta)
SELECT insert_lock.id, $4, $5
FROM insert_lock
- RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta`
+ RETURNING id, state, created_at, updated_at, lock_id, attempt, meta, change,
+ repository_id, replica_path, relative_path, target_node_storage,
+ source_node_storage, virtual_storage, params`
// this will always return a single row result (because of lock uniqueness) or an error
rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
if err != nil {
@@ -275,7 +255,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
SELECT id
FROM replication_queue
WHERE id IN (
- SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at)
+ SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, change ORDER BY queue.created_at)
FROM replication_queue AS queue
JOIN lock ON queue.lock_id = lock.id
WHERE queue.state IN ('ready', 'failed' )
@@ -287,12 +267,16 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
)
, job AS (
UPDATE replication_queue AS queue
- SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END
+ SET attempt = CASE WHEN queue.change = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END
, state = 'in_progress'
, updated_at = NOW() AT TIME ZONE 'UTC'
FROM candidate
WHERE queue.id = candidate.id
- RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta
+ RETURNING queue.id, queue.state, queue.created_at, queue.updated_at,
+ queue.lock_id, queue.attempt, queue.meta, queue.change,
+ queue.repository_id, queue.replica_path, queue.relative_path,
+ queue.target_node_storage, queue.source_node_storage, queue.virtual_storage,
+ queue.params
)
, track_job_lock AS (
INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at)
@@ -306,7 +290,9 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor
FROM track_job_lock AS tracked
WHERE lock.id = tracked.lock_id
)
- SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta
+ SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change,
+ repository_id, replica_path, relative_path, target_node_storage,
+ source_node_storage, virtual_storage, params
FROM job
ORDER BY id`
rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count)
@@ -353,7 +339,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
query := `
WITH existing AS (
- SELECT id, lock_id, updated_at, job
+ SELECT id, lock_id, updated_at, change, source_node_storage
FROM replication_queue
WHERE id = ANY($1)
AND state = 'in_progress'
@@ -374,9 +360,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
-- they are for the exact same repository
AND queue.lock_id = existing.lock_id
-- and created to apply exact same replication operation (gc, update, ...)
- AND queue.job->>'change' = existing.job->>'change'
+ AND queue.change = existing.change
-- from the same source storage (if applicable, as 'gc' has no source)
- AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', ''))
+ AND COALESCE(queue.source_node_storage, '') = COALESCE(existing.source_node_storage, ''))
)
)
RETURNING queue.id, queue.lock_id
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 96407f125..45052fae5 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -159,7 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
Params: nil,
},
}
- insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage)
+ repositoryID := insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage)
actualEvent, err := queue.Enqueue(ctx, eventType) // initial event
require.NoError(t, err)
@@ -173,6 +173,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
Attempt: 3,
LockID: "praefect|gitaly-1|/project/path-1",
Job: ReplicationJob{
+ RepositoryID: repositoryID,
Change: UpdateRepo,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-1",
@@ -431,6 +432,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
LockID: "praefect-0|gitaly-2|/project/path-1",
Job: ReplicationJob{
Change: RenameRepo,
+ RepositoryID: eventType1.Job.RepositoryID,
RelativePath: "/project/path-1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "",
@@ -1244,12 +1246,19 @@ func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []R
exp[i].UpdatedAt = nil
}
- sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id`
+ sqlStmt := `SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change,
+ repository_id, replica_path, relative_path, target_node_storage,
+ source_node_storage, virtual_storage, params
+ FROM replication_queue ORDER BY id`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
actual, err := scanReplicationEvents(rows)
require.NoError(t, err)
+ for i := 0; i < len(actual); i++ {
+ actual[i].CreatedAt = time.Time{}
+ actual[i].UpdatedAt = nil
+ }
require.Equal(t, exp, actual)
}