diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-16 19:09:18 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2022-02-23 13:36:05 +0300 |
commit | a269dcdac7caf7282f2f10a21eea33a1d8258dd0 (patch) | |
tree | 4b283f515e557a948c8260a3a08b44a6b00d35bb | |
parent | 3cb0c82f8f75aefc5dffef98f8b86404d9de3db7 (diff) |
datastore: Use new columns instead of JBONB job column
As we flattened job column of JSONB type to a set of
simple columns we deprecate usage of the job column
and replaces it with references on the corresponding
columns of the replication_queue table for
PostgresReplicationEventQueue.
The data is retrieved from the columns on the scan
operation. The tests are fixed to include repository_id
into replication jobs.
-rw-r--r-- | internal/praefect/datastore/queue.go | 88 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 13 |
2 files changed, 48 insertions, 53 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 986564b18..65948426a 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -101,50 +101,8 @@ type ReplicationEvent struct { Meta Params } -// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. -func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) { - var mapping []interface{} - for _, column := range columns { - switch column { - case "id": - mapping = append(mapping, &event.ID) - case "state": - mapping = append(mapping, &event.State) - case "created_at": - mapping = append(mapping, &event.CreatedAt) - case "updated_at": - mapping = append(mapping, &event.UpdatedAt) - case "attempt": - mapping = append(mapping, &event.Attempt) - case "lock_id": - mapping = append(mapping, &event.LockID) - case "job": - mapping = append(mapping, &event.Job) - case "meta": - mapping = append(mapping, &event.Meta) - default: - return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) - } - } - return mapping, nil -} - -// Scan fills receive fields with values fetched from database based on the set of columns/column aliases. -func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error { - mappings, err := event.Mapping(columns) - if err != nil { - return err - } - return rows.Scan(mappings...) -} - // scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases. func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) { - columns, err := rows.Columns() - if err != nil { - return events, err - } - defer func() { if cErr := rows.Close(); cErr != nil && err == nil { err = cErr @@ -153,9 +111,29 @@ func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error for rows.Next() { var event ReplicationEvent - if err = event.Scan(columns, rows); err != nil { + var srcNodeStorage sql.NullString + if err = rows.Scan( + &event.ID, + &event.State, + &event.CreatedAt, + &event.UpdatedAt, + &event.LockID, + &event.Attempt, + &event.Meta, + &event.Job.Change, + &event.Job.RepositoryID, + &event.Job.ReplicaPath, + &event.Job.RelativePath, + &event.Job.TargetNodeStorage, + &srcNodeStorage, + &event.Job.VirtualStorage, + &event.Job.Params, + ); err != nil { return events, err } + if srcNodeStorage.Valid { + event.Job.SourceNodeStorage = srcNodeStorage.String + } events = append(events, event) } @@ -226,7 +204,9 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli INSERT INTO replication_queue(lock_id, job, meta) SELECT insert_lock.id, $4, $5 FROM insert_lock - RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` + RETURNING id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params` // this will always return a single row result (because of lock uniqueness) or an error rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) if err != nil { @@ -275,7 +255,7 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor SELECT id FROM replication_queue WHERE id IN ( - SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) + SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, change ORDER BY queue.created_at) FROM replication_queue AS queue JOIN lock ON queue.lock_id = lock.id WHERE queue.state IN ('ready', 'failed' ) @@ -287,12 +267,16 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor ) , job AS ( UPDATE replication_queue AS queue - SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END + SET attempt = CASE WHEN queue.change = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END , state = 'in_progress' , updated_at = NOW() AT TIME ZONE 'UTC' FROM candidate WHERE queue.id = candidate.id - RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta + RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, + queue.lock_id, queue.attempt, queue.meta, queue.change, + queue.repository_id, queue.replica_path, queue.relative_path, + queue.target_node_storage, queue.source_node_storage, queue.virtual_storage, + queue.params ) , track_job_lock AS ( INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) @@ -306,7 +290,9 @@ func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStor FROM track_job_lock AS tracked WHERE lock.id = tracked.lock_id ) - SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta + SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params FROM job ORDER BY id` rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) @@ -353,7 +339,7 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J query := ` WITH existing AS ( - SELECT id, lock_id, updated_at, job + SELECT id, lock_id, updated_at, change, source_node_storage FROM replication_queue WHERE id = ANY($1) AND state = 'in_progress' @@ -374,9 +360,9 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J -- they are for the exact same repository AND queue.lock_id = existing.lock_id -- and created to apply exact same replication operation (gc, update, ...) - AND queue.job->>'change' = existing.job->>'change' + AND queue.change = existing.change -- from the same source storage (if applicable, as 'gc' has no source) - AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', '')) + AND COALESCE(queue.source_node_storage, '') = COALESCE(existing.source_node_storage, '')) ) ) RETURNING queue.id, queue.lock_id diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 96407f125..45052fae5 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -159,7 +159,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Params: nil, }, } - insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) + repositoryID := insertRepository(t, db, ctx, eventType.Job.VirtualStorage, eventType.Job.RelativePath, eventType.Job.SourceNodeStorage) actualEvent, err := queue.Enqueue(ctx, eventType) // initial event require.NoError(t, err) @@ -173,6 +173,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { Attempt: 3, LockID: "praefect|gitaly-1|/project/path-1", Job: ReplicationJob{ + RepositoryID: repositoryID, Change: UpdateRepo, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-1", @@ -431,6 +432,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { LockID: "praefect-0|gitaly-2|/project/path-1", Job: ReplicationJob{ Change: RenameRepo, + RepositoryID: eventType1.Job.RepositoryID, RelativePath: "/project/path-1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "", @@ -1244,12 +1246,19 @@ func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []R exp[i].UpdatedAt = nil } - sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id` + sqlStmt := `SELECT id, state, created_at, updated_at, lock_id, attempt, meta, change, + repository_id, replica_path, relative_path, target_node_storage, + source_node_storage, virtual_storage, params + FROM replication_queue ORDER BY id` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) actual, err := scanReplicationEvents(rows) require.NoError(t, err) + for i := 0; i < len(actual); i++ { + actual[i].CreatedAt = time.Time{} + actual[i].UpdatedAt = nil + } require.Equal(t, exp, actual) } |