Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-10-25 16:59:11 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-10-25 16:59:11 +0300
commit54837cd5cb3b5ab8f35dfa447bd488fc78bbd76b (patch)
tree6aca5e98e15491bf969b86765273b1c285ef2f35
parent8e3a280d7c26d122d720eddc426cd166735c036e (diff)
parent8c4da135fa282fe1a4f2dc5c4523aef23fb4dc11 (diff)
Merge branch 'ps-replication-job-state' into 'master'
replication: Remove 'dead' stale jobs. See merge request gitlab-org/gitaly!3996
-rw-r--r--internal/praefect/datastore/queue.go8
-rw-r--r--internal/praefect/datastore/queue_test.go15
2 files changed, 14 insertions, 9 deletions
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 72d88f970..e00affd9d 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -484,11 +484,15 @@ func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, st
)
, update_job AS (
UPDATE replication_queue AS queue
- SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE
+ SET state = 'failed'::REPLICATION_JOB_STATE
FROM stale_job_lock
- WHERE stale_job_lock.job_id = queue.id
+ WHERE stale_job_lock.job_id = queue.id AND attempt >= 1
RETURNING queue.id, queue.lock_id
)
+ , delete_job AS (
+ DELETE FROM replication_queue AS queue
+ WHERE attempt = 0 AND id IN (SELECT job_id FROM stale_job_lock)
+ )
UPDATE replication_queue_lock
SET acquired = FALSE
WHERE id IN (
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index aeee21694..65b939b1f 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -1048,16 +1048,16 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
db.TruncateAll(t)
source := NewPostgresReplicationEventQueue(db)
- var exp []ReplicationEvent
+ var events []ReplicationEvent
for _, eventType := range []ReplicationEvent{eventType1, eventType2, eventType3} {
event, err := source.Enqueue(ctx, eventType)
require.NoError(t, err)
devents, err := source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
require.NoError(t, err)
- exp = append(exp, devents...)
+ events = append(events, devents...)
}
- for event, i := exp[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead'
+ for event, i := events[0], 0; i < 2; i++ { // consume all processing attempts to verify that state will be changed to 'dead'
_, err := source.Acknowledge(ctx, JobStateFailed, []uint64{event.ID})
require.NoError(t, err)
_, err = source.Dequeue(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, 1)
@@ -1066,10 +1066,11 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
require.NoError(t, source.AcknowledgeStale(ctx, time.Microsecond))
- exp[0].State = JobStateDead
- exp[0].Attempt = 0
- for i := range exp[1:] {
- exp[1+i].State = JobStateFailed
+ var exp []ReplicationEvent
+ // The first event should be removed from table as its state changed to 'dead'.
+ for _, e := range events[1:] {
+ e.State = JobStateFailed
+ exp = append(exp, e)
}
requireEvents(t, ctx, db, exp)