Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-20 16:59:17 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-21 16:27:49 +0300
commit8c4da135fa282fe1a4f2dc5c4523aef23fb4dc11 (patch)
tree9849c9c41e028bf43afdda7b52b97342eafdfcc6
parente54b12c07e81c79c5e8044b02bb904d139a81ee4 (diff)
replication: Remove 'dead' stale jobs.
When replication job processing is completed, we acknowledge the event. If processing was successful the state of the event should be changed to 'completed', for failed processing attempts the state becomes 'failed' and if event processing failed max amount of attempts it should be changed to 'dead'. To reduce amount of rows in the replication_queue table the events that are going to be moved into 'completed' or 'dead' state not updated but removed from the table. The change aligns this behavior to AcknowledgeStale method that was not removing 'dead' events from the table, but only updated their status.
-rw-r--r--internal/praefect/datastore/queue.go8
-rw-r--r--internal/praefect/datastore/queue_test.go15
2 files changed, 14 insertions, 9 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 72d88f970..e00affd9d 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -484,11 +484,15 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st
)
, update_job AS (
UPDATE replication_queue AS queue
- SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE
+ SET state = 'failed'::REPLICATION_JOB_STATE
FROM stale_job_lock
- WHERE stale_job_lock.job_id = queue.id
+ WHERE stale_job_lock.job_id = queue.id AND attempt >= 1
RETURNING queue.id, queue.lock_id
)
+ , delete_job AS (
+ DELETE FROM replication_queue AS queue
+ WHERE attempt = 0 AND id IN (SELECT job_id FROM stale_job_lock)
+ )
UPDATE replication_queue_lock
SET acquired = FALSE
WHERE id IN (
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index aeee21694..65b939b1f 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -1048,16 +1048,16 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
- var exp []ReplicationEvent
+ var events []ReplicationEvent
for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} {
event, err := source.Enqueue(ctx, eventType)
require.NoError(t, err)
devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
require.NoError(t, err)
- exp = append(exp, devents...)
+ events = append(events, devents...)
}
- for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead'
+ for event, i := events[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead'
_, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID})
require.NoError(t, err)
_, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
@@ -1066,10 +1066,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond))
- exp[0].State = JobStateDead
- exp[0].Attempt = 0
- for i := range exp[1:] {
- exp[1+i].State = JobStateFailed
+ var exp []ReplicationEvent
+ // The first event should be removed from table as its state changed to 'dead'.
+ for _, e := range events[1:] {
+ e.State = JobStateFailed
+ exp = append(exp, e)
}
requireEvents(t, ctx, db, exp)