diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-25 16:59:11 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-25 16:59:11 +0300 |
commit | 54837cd5cb3b5ab8f35dfa447bd488fc78bbd76b (patch) | |
tree | 6aca5e98e15491bf969b86765273b1c285ef2f35 | |
parent | 8e3a280d7c26d122d720eddc426cd166735c036e (diff) | |
parent | 8c4da135fa282fe1a4f2dc5c4523aef23fb4dc11 (diff) |
Merge branch 'ps-replication-job-state' into 'master'
replication: Remove 'dead' stale jobs.
See merge request gitlab-org/gitaly!3996
-rw-r--r-- | internal/praefect/datastore/queue.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 15 |
2 files changed, 14 insertions, 9 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 72d88f970..e00affd9d 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -484,11 +484,15 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st ) , update_job AS ( UPDATE replication_queue AS queue - SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE + SET state = 'failed'::REPLICATION_JOB_STATE FROM stale_job_lock - WHERE stale_job_lock.job_id = queue.id + WHERE stale_job_lock.job_id = queue.id AND attempt >= 1 RETURNING queue.id, queue.lock_id ) + , delete_job AS ( + DELETE FROM replication_queue AS queue + WHERE attempt = 0 AND id IN (SELECT job_id FROM stale_job_lock) + ) UPDATE replication_queue_lock SET acquired = FALSE WHERE id IN ( diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index aeee21694..65b939b1f 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -1048,16 +1048,16 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) - var exp []ReplicationEvent + var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) require.NoError(t, err) - exp = append(exp, devents...) + events = append(events, devents...) } - for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' + for event, i := events[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' _, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID}) require.NoError(t, err) _, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1066,10 +1066,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond)) - exp[0].State = JobStateDead - exp[0].Attempt = 0 - for i := range exp[1:] { - exp[1+i].State = JobStateFailed + var exp []ReplicationEvent + // The first event should be removed from table as its state changed to 'dead'. + for _, e := range events[1:] { + e.State = JobStateFailed + exp = append(exp, e) } requireEvents(t, ctx, db, exp) |