diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-20 16:59:17 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-10-21 16:27:49 +0300 |
commit | 8c4da135fa282fe1a4f2dc5c4523aef23fb4dc11 (patch) | |
tree | 9849c9c41e028bf43afdda7b52b97342eafdfcc6 /internal/praefect/datastore/queue_test.go | |
parent | e54b12c07e81c79c5e8044b02bb904d139a81ee4 (diff) |
replication: Remove 'dead' stale jobs.
When replication job processing is completed, we acknowledge
the event. If processing was successful the state of the
event should be changed to 'completed', for failed processing
attempts the state becomes 'failed' and if event processing
failed max amount of attempts it should be changed to 'dead'.
To reduce amount of rows in the replication_queue table the
events that are going to be moved into 'completed' or 'dead'
state not updated but removed from the table. The change
aligns this behavior to AcknowledgeStale method that was not
removing 'dead' events from the table, but only updated their
status.
Diffstat (limited to 'internal/praefect/datastore/queue_test.go')
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index aeee21694..65b939b1f 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -1048,16 +1048,16 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { db.TruncateAll(t) source := NewPostgresReplicationEventQueue(db) - var exp []ReplicationEvent + var events []ReplicationEvent for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} { event, err := source.Enqueue(ctx, eventType) require.NoError(t, err) devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) require.NoError(t, err) - exp = append(exp, devents...) + events = append(events, devents...) } - for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' + for event, i := events[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead' _, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID}) require.NoError(t, err) _, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1) @@ -1066,10 +1066,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond)) - exp[0].State = JobStateDead - exp[0].Attempt = 0 - for i := range exp[1:] { - exp[1+i].State = JobStateFailed + var exp []ReplicationEvent + // The first event should be removed from table as its state changed to 'dead'. + for _, e := range events[1:] { + e.State = JobStateFailed + exp = append(exp, e) } requireEvents(t, ctx, db, exp) |