diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-20 16:59:17 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-21 16:27:49 +0300 |
commit | 8c4da135fa282fe1a4f2dc5c4523aef23fb4dc11 (patch) | |
tree | 9849c9c41e028bf43afdda7b52b97342eafdfcc6 | |
parent | e54b12c07e81c79c5e8044b02bb904d139a81ee4 (diff) |
replication: Remove 'dead' stale jobs.
When replication job processing is completed, we acknowledge
the event. If processing was successful the state of the
event should be changed to 'completed', for failed processing
attempts the state becomes 'failed' and if event processing
failed max amount of attempts it should be changed to 'dead'.
To reduce amount of rows in the replication_queue table the
events that are going to be moved into 'completed' or 'dead'
state not updated but removed from the table. The change
aligns this behavior to AcknowledgeStale method that was not
removing 'dead' events from the table, but only updated their
status.
-rw-r--r-- | internal/praefect/datastore/queue.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 15 |
2 files changed, 14 insertions, 9 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 72d88f970..e00affd9d 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -484,11 +484,15 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st ) , update_job AS ( UPDATE replication_queue AS queue - SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE + SET state = 'failed'::REPLICATION_JOB_STATE FROM stale_job_lock - WHERE stale_job_lock.job_id = queue.id + WHERE stale_job_lock.job_id = queue.id AND attempt >= 1 RETURNING queue.id, queue.lock_id ) + , delete_job AS ( + DELETE FROM replication_queue AS queue + WHERE attempt = 0 AND id IN (SELECT job_id FROM stale_job_lock) + ) UPDATE replication_queue_lock SET acquired = FALSE WHERE id IN ( diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index aeee21694..65b939b1f 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -1048,16 +1048,16 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) - var exp []ReplicationEvent + var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) require.NoError(t, err) - exp = append(exp, devents...) + events = append(events, devents...) } - for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' + for event, i := events[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' _, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID}) require.NoError(t, err) _, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1066,10 +1066,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond)) - exp[0].State = JobStateDead - exp[0].Attempt = 0 - for i := range exp[1:] { - exp[1+i].State = JobStateFailed + var exp []ReplicationEvent + // The first event should be removed from table as its state changed to 'dead'. + for _, e := range events[1:] { + e.State = JobStateFailed + exp = append(exp, e) } requireEvents(t, ctx, db, exp) |