diff options
-rw-r--r-- | _support/praefect-schema.sql | 7 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 4 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 53 | ||||
-rw-r--r-- | internal/praefect/datastore/collector_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go | 27 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 19 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 254 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 5 |
10 files changed, 289 insertions, 88 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index c0874080c..970f0e2ed 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -502,6 +502,13 @@ CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USIN -- +-- Name: replication_queue_constraint; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX replication_queue_constraint ON public.replication_queue USING btree (job, state); + + +-- -- Name: replication_queue_target_index; Type: INDEX; Schema: public; Owner: - -- diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index e32fb6fd6..53e2c760b 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -10,6 +10,7 @@ import ( "math/rand" "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" @@ -238,6 +239,10 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context, } if _, err := queue.Enqueue(ctx, event); err != nil { + if errors.As(err, &datastore.ReplicationEventExistsError{}) { + ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry") + return nil + } return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err) } fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 51ddd0b45..b2f639f35 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -1098,6 +1098,10 @@ func (c *Coordinator) newRequestFinalizer( g.Go(func() error { if _, err := c.queue.Enqueue(ctx, event); err != nil { + if errors.As(err, &datastore.ReplicationEventExistsError{}) { + ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry") + return nil + } return fmt.Errorf("enqueue replication event: %w", err) } return nil diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index de7ccf360..d8a024e1b 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -2893,6 +2893,59 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { } } +func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + enqueueErr error + expectedErr error + }{ + { + desc: "most errors are propagated back", + enqueueErr: errors.New("enqueue failed"), + expectedErr: fmt.Errorf("enqueue replication event: %w", errors.New("enqueue failed")), + }, + { + desc: "replication event exists errors are ignored ", + enqueueErr: datastore.ReplicationEventExistsError{}, + expectedErr: nil, + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + err := NewCoordinator( + &datastore.MockReplicationEventQueue{ + EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { + return datastore.ReplicationEvent{}, tc.enqueueErr + }, + }, + datastore.MockRepositoryStore{}, + nil, + nil, + config.Config{}, + nil, + ).newRequestFinalizer(ctx, + 1, + "praefect", + &gitalypb.Repository{}, + "replic-path", + "primary", + []string{}, + []string{"secondary"}, + datastore.UpdateRepo, + datastore.Params{"RelativePath": "relative-path"}, + "rpc-name", + )() + require.Equal(t, tc.expectedErr, err) + }) + } +} + func TestStreamParametersContext(t *testing.T) { // Because we're using NewFeatureFlag, they'll end up in the All array. enabledFF := featureflag.NewFeatureFlag("default_enabled", "", "", true) diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go index e01f6cd53..9c995a58f 100644 --- a/internal/praefect/datastore/collector_test.go +++ b/internal/praefect/datastore/collector_test.go @@ -270,7 +270,7 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) { _, err := queue.Enqueue(ctx, ReplicationEvent{ Job: ReplicationJob{ Change: UpdateRepo, - RelativePath: "/project/path-1", + RelativePath: fmt.Sprintf("/project/path-%d", i), TargetNodeStorage: nodes[1], SourceNodeStorage: nodes[0], VirtualStorage: virtualStorage, diff --git a/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go new file mode 100644 index 000000000..8d15753f2 --- /dev/null +++ b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go @@ -0,0 +1,27 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20230118161145_replication_queue_unique_constraint", + Up: []string{` + --- Delete existing duplicates from the replication_queue table + DELETE FROM replication_queue a + USING replication_queue b + WHERE + a.id > b.id + AND a.job = b.job + AND a.state = b.state; +`, ` + --- Create the unique index which will prevent duplicates + CREATE UNIQUE INDEX replication_queue_constraint on replication_queue ( + job, + state + ); + `}, + Down: []string{"DROP INDEX replication_queue_constraint;"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 2fc9c0b85..a65191eb1 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -12,6 +12,18 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" ) +// ReplicationEventExistsError is returned when trying to add an already existing +// replication job into the queue +type ReplicationEventExistsError struct { + state string + job ReplicationJob +} + +// Error returns the errors message. +func (err ReplicationEventExistsError) Error() string { + return fmt.Sprintf("replication event %q -> %q already exists", err.state, err.job) +} + // ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back. type ReplicationEventQueue interface { // Enqueue puts provided event into the persistent queue. @@ -229,6 +241,13 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli // this will always return a single row result (because of lock uniqueness) or an error rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) if err != nil { + if glsql.IsUniqueViolation(err, "replication_queue_constraint") { + return ReplicationEvent{}, ReplicationEventExistsError{ + state: event.State.String(), + job: event.Job, + } + } + return ReplicationEvent{}, fmt.Errorf("query: %w", err) } diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 6aac7ca4d..18ffd6659 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -300,32 +300,19 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event db.RequireRowsInTable(t, "replication_queue_job_lock", 0) - event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event - require.NoError(t, err) - - expEvent2 := ReplicationEvent{ - ID: event2.ID, - State: "ready", - Attempt: 3, - LockID: "praefect-0|gitaly-1|/project/path-1", - Job: ReplicationJob{ - Change: UpdateRepo, - RelativePath: "/project/path-1", - TargetNodeStorage: "gitaly-1", - SourceNodeStorage: "gitaly-0", - VirtualStorage: "praefect-0", - Params: nil, - }, - } + _, err = queue.Enqueue(ctx, eventType1) // repeat of the same event + require.ErrorAs(t, err, &ReplicationEventExistsError{}) - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2}) - requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event + // doesn't insert the same event again + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1}) + // expected still one the same lock for repeated event + requireLocks(t, ctx, db, []LockRow{expLock1}) - event3, err := queue.Enqueue(ctx, eventType2) // event for another target + event2, err := queue.Enqueue(ctx, eventType2) // event for another target require.NoError(t, err) - expEvent3 := ReplicationEvent{ - ID: event3.ID, + expEvent2 := ReplicationEvent{ + ID: event2.ID, State: JobStateReady, Attempt: 3, LockID: "praefect-0|gitaly-2|/project/path-1", @@ -339,14 +326,14 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event - event4, err := queue.Enqueue(ctx, eventType3) // event for another repo + event3, err := queue.Enqueue(ctx, eventType3) // event for another repo require.NoError(t, err) - expEvent4 := ReplicationEvent{ - ID: event4.ID, + expEvent3 := ReplicationEvent{ + ID: event3.ID, State: JobStateReady, Attempt: 3, LockID: "praefect-1|gitaly-1|/project/path-2", @@ -360,7 +347,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { }, } - requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4}) + requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty @@ -464,7 +451,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // events to fill in the queue - events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4} + events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { var err error events[i], err = queue.Enqueue(ctx, events[i]) @@ -472,11 +459,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { } // first request to deque - expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]} + expectedEvents1 := []ReplicationEvent{events[0], events[1], events[2]} expectedJobLocks1 := []JobLockRow{ {JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: events[1].ID, LockID: "praefect|gitaly-1|/project/path-1"}, + {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-2"}, } // we expect only first two types of events by limiting count to 3 @@ -500,11 +487,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) { // second request to deque // there must be only last event fetched from the queue - expectedEvents2 := []ReplicationEvent{events[5]} + expectedEvents2 := []ReplicationEvent{events[3]} expectedEvents2[0].State = JobStateInProgress expectedEvents2[0].Attempt = 2 - expectedJobLocks2 := []JobLockRow{{JobID: 6, LockID: "backup|gitaly-1|/project/path-1"}} + expectedJobLocks2 := []JobLockRow{{JobID: 4, LockID: "backup|gitaly-1|/project/path-1"}} dequeuedEvents2, err := queue.Dequeue(ctx, "backup", "gitaly-1", 100500) require.NoError(t, err) @@ -550,10 +537,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }, } - for i := 0; i < 2; i++ { - _, err := queue.Enqueue(ctx, eventType1) - require.NoError(t, err, "failed to fill in event queue") - } + _, err := queue.Enqueue(ctx, eventType1) + require.NoError(t, err, "failed to fill in event queue") dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) require.NoError(t, err) @@ -564,10 +549,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }) requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}}) - for i := 0; i < 2; i++ { - _, err := queue.Enqueue(ctx, eventType2) - require.NoError(t, err, "failed to fill in event queue") - } + _, err = queue.Enqueue(ctx, eventType2) + require.NoError(t, err, "failed to fill in event queue") dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1) require.NoError(t, err) @@ -578,7 +561,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}, - {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"}, + {JobID: 2, LockID: "praefect|gitaly-1|/project/path-2"}, }) } @@ -675,121 +658,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }, } - events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue + events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4} for i := range events { var err error events[i], err = queue.Enqueue(ctx, events[i]) require.NoError(t, err, "failed to fill in event queue") } - // we expect only first three types of events by limiting count to 3 - dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3) + // we expect only first two types events by limiting count to 2 + dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 2) require.NoError(t, err) - require.Len(t, dequeuedEvents1, 3) + require.Len(t, dequeuedEvents1, 2) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, - {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: true}, + {ID: events[2].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, }) // release lock for events of second type - acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID}) + acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[1].ID}) require.NoError(t, err) - require.Equal(t, []uint64{3}, acknowledge1) + require.Equal(t, []uint64{2}, acknowledge1) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, }) dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3) require.NoError(t, err) - require.Len(t, dequeuedEvents2, 1, "expected: events of type 2 ('failed' will be fetched for retry)") + require.Len(t, dequeuedEvents2, 2, "expected: events of type 2 ('failed' will be fetched for retry)") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: true}, {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: true}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ {JobID: events[0].ID, LockID: events[0].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[4].ID, LockID: events[4].LockID}, }) // creation of the new event that is equal to those already dequeue and processed // it is used to verify that the event created after consuming events from queue won't be marked // with previously created events as it may cause delay in replication - _, err = queue.Enqueue(ctx, eventType1) + newEvent, err := queue.Enqueue(ctx, eventType1) require.NoError(t, err) - acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID}) + acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[2].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2) + require.Equal(t, []uint64{events[0].ID, events[2].ID}, acknowledge2) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, - {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: true}, + {ID: events[2].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, }) - db.RequireRowsInTable(t, "replication_queue", 4) + db.RequireRowsInTable(t, "replication_queue", 3) dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3) require.NoError(t, err) require.Len(t, dequeuedEvents3, 1, "expected: event of type 4") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, - {ID: events[2].LockID, Acquired: true}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: true}, + {ID: events[1].LockID, Acquired: true}, + {ID: events[2].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: true}, }) requireJobLocks(t, ctx, db, []JobLockRow{ - {JobID: events[2].ID, LockID: events[2].LockID}, - {JobID: events[6].ID, LockID: events[6].LockID}, + {JobID: events[1].ID, LockID: events[1].LockID}, + {JobID: events[3].ID, LockID: events[3].LockID}, }) - acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID}) + acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[1].ID, events[3].ID}) require.NoError(t, err) - require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3) + require.Equal(t, []uint64{events[1].ID, events[3].ID}, acknowledged3) requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, nil) db.RequireRowsInTable(t, "replication_queue", 1) - newEvent, err := queue.Enqueue(ctx, eventType1) - require.NoError(t, err) - acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID}) require.NoError(t, err) require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged - db.RequireRowsInTable(t, "replication_queue", 2) + db.RequireRowsInTable(t, "replication_queue", 1) var newEventState string require.NoError(t, db.QueryRow("SELECT state FROM replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState)) require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)") requireLocks(t, ctx, db, []LockRow{ {ID: events[0].LockID, Acquired: false}, + {ID: events[1].LockID, Acquired: false}, {ID: events[2].LockID, Acquired: false}, - {ID: events[4].LockID, Acquired: false}, - {ID: events[6].LockID, Acquired: false}, + {ID: events[3].LockID, Acquired: false}, }) requireJobLocks(t, ctx, db, nil) } @@ -1047,6 +1025,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) { }) } +// Check if the queue returns the expected error when adding a duplicate +// replication event. +func TestPostgresReplicationEventQueue_UniqueConstraint(t *testing.T) { + t.Parallel() + + initialEvent := ReplicationEvent{ + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + }, + } + + for _, tc := range []struct { + desc string + event ReplicationEvent + expectedErr error + expectedEvent *ReplicationEvent + expectedLock *LockRow + }{ + { + desc: "throws error when same event added again", + event: initialEvent, + expectedErr: ReplicationEventExistsError{ + // empty state defaults to `ready` + state: "", + job: initialEvent.Job, + }, + }, + { + desc: "Adds new event to the queue", + event: ReplicationEvent{ + LockID: "praefect|gitaly-2|/project/path-2", + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-1", + VirtualStorage: "praefect", + Params: nil, + }, + }, + expectedEvent: &ReplicationEvent{ + ID: 2, + State: JobStateReady, + Attempt: 3, + LockID: "praefect|gitaly-2|/project/path-2", + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-1", + VirtualStorage: "praefect", + Params: nil, + }, + }, + expectedLock: &LockRow{ID: "praefect|gitaly-2|/project/path-2", Acquired: false}, + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + ctx := testhelper.Context(t) + + queue := PostgresReplicationEventQueue{db.DB} + + actualEvent, err := queue.Enqueue(ctx, initialEvent) + require.NoError(t, err) + // we need to setup it to default because it is not possible to get it beforehand for expected + actualEvent.CreatedAt = time.Time{} + + initialExpectedLock := LockRow{ID: "praefect|gitaly-1|/project/path-1", Acquired: false} + initialExpectedEvent := ReplicationEvent{ + ID: 1, + State: JobStateReady, + Attempt: 3, + LockID: "praefect|gitaly-1|/project/path-1", + Job: ReplicationJob{ + Change: UpdateRepo, + RelativePath: "/project/path-1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "praefect", + Params: nil, + }, + } + + require.Equal(t, initialExpectedEvent, actualEvent) + requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent}) + // expected a new lock for new event + requireLocks(t, ctx, db, []LockRow{initialExpectedLock}) + db.RequireRowsInTable(t, "replication_queue_job_lock", 0) + + _, err = queue.Enqueue(ctx, tc.event) + require.Equal(t, tc.expectedErr, err) + + if tc.expectedEvent != nil { + requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent, *tc.expectedEvent}) + requireLocks(t, ctx, db, []LockRow{initialExpectedLock, *tc.expectedLock}) + } + }) + } +} + func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) { t.Helper() diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 680323d06..dfdce5adf 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -275,6 +275,7 @@ reconciliation_jobs AS ( -- only perform inserts if we managed to acquire the lock as otherwise -- we'd schedule duplicate jobs WHERE ( SELECT acquired FROM reconciliation_lock ) + ON CONFLICT DO NOTHING RETURNING lock_id, meta, job ), diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 86754130d..16604c057 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -509,7 +509,7 @@ func TestProcessBacklog_Success(t *testing.T) { ackIDs, err := queue.Acknowledge(ctx, state, ids) if len(ids) > 0 { assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected") - assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once") + assert.Equal(t, []uint64{1, 2, 3}, ids, "all jobs must be processed at once") } return ackIDs, err }) @@ -536,9 +536,6 @@ func TestProcessBacklog_Success(t *testing.T) { _, err := queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) - _, err = queueInterceptor.Enqueue(ctx, eventType1) - require.NoError(t, err) - renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1") fullNewPath1 := filepath.Join(backupCfg.Storages[0].Path, renameTo1) |