Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarthik Nayak <knayak@gitlab.com>2023-01-18 19:13:51 +0300
committerKarthik Nayak <knayak@gitlab.com>2023-01-30 17:51:54 +0300
commit89b58c1141cd0360f4a141e2948905e31f4ad3c6 (patch)
tree87d6a241634bed989ea2823df0394d5b47b28b09
parent11e837787173417c2dc1f8beefce34d1d07ec1cf (diff)
migrations: Add unique index `replication_queue_constraint`3940-avoid-duplicate-jobs-in-the-replication-queue
To ensure we don't duplicate replication events in the `replication_queue` table. Let's create a new unique index [0] which adds a constraint on the following columns - state - job This is safer than using a `WHERE NOT` clause during the insert, as there is no chances of race condition. It is also faster, because now we only hit the index and don't scan all rows of the table. The migration first deletes any existing duplicates, before adding the new constraint. With this is in place, we can now capture and propagate this error. In `datastore.queue` we create a new error type: `ReplicationEventExistsError`. This error is captured in the `datastore.Enqueue` function by checking for the `replication_queue_constraint` unique constraint violation. We propagate this error back. In `praefect.newRequestFinalizer` we capture this error, log it and simply move on because this is an expected scenario and nothing to stop execution for. [0]: https://www.postgresql.org/docs/current/indexes-unique.html Changelog: added
-rw-r--r--_support/praefect-schema.sql7
-rw-r--r--cmd/praefect/subcmd_track_repository.go5
-rw-r--r--internal/praefect/coordinator.go4
-rw-r--r--internal/praefect/coordinator_test.go53
-rw-r--r--internal/praefect/datastore/collector_test.go2
-rw-r--r--internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go27
-rw-r--r--internal/praefect/datastore/queue.go19
-rw-r--r--internal/praefect/datastore/queue_test.go254
-rw-r--r--internal/praefect/reconciler/reconciler.go1
-rw-r--r--internal/praefect/replicator_test.go5
10 files changed, 289 insertions, 88 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql
index c0874080c..970f0e2ed 100644
--- a/_support/praefect-schema.sql
+++ b/_support/praefect-schema.sql
@@ -502,6 +502,13 @@ CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USIN
--
+-- Name: replication_queue_constraint; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE UNIQUE INDEX replication_queue_constraint ON public.replication_queue USING btree (job, state);
+
+
+--
-- Name: replication_queue_target_index; Type: INDEX; Schema: public; Owner: -
--
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index e32fb6fd6..53e2c760b 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -10,6 +10,7 @@ import (
"math/rand"
"time"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v15/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
@@ -238,6 +239,10 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context,
}
if _, err := queue.Enqueue(ctx, event); err != nil {
+ if errors.As(err, &datastore.ReplicationEventExistsError{}) {
+ ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry")
+ return nil
+ }
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 51ddd0b45..b2f639f35 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -1098,6 +1098,10 @@ func (c *Coordinator) newRequestFinalizer(
g.Go(func() error {
if _, err := c.queue.Enqueue(ctx, event); err != nil {
+ if errors.As(err, &datastore.ReplicationEventExistsError{}) {
+ ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry")
+ return nil
+ }
return fmt.Errorf("enqueue replication event: %w", err)
}
return nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index de7ccf360..d8a024e1b 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -2893,6 +2893,59 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
}
}
+func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) {
+ t.Parallel()
+ ctx := testhelper.Context(t)
+
+ for _, tc := range []struct {
+ desc string
+ enqueueErr error
+ expectedErr error
+ }{
+ {
+ desc: "most errors are propagated back",
+ enqueueErr: errors.New("enqueue failed"),
+ expectedErr: fmt.Errorf("enqueue replication event: %w", errors.New("enqueue failed")),
+ },
+ {
+ desc: "replication event exists errors are ignored ",
+ enqueueErr: datastore.ReplicationEventExistsError{},
+ expectedErr: nil,
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ err := NewCoordinator(
+ &datastore.MockReplicationEventQueue{
+ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
+ return datastore.ReplicationEvent{}, tc.enqueueErr
+ },
+ },
+ datastore.MockRepositoryStore{},
+ nil,
+ nil,
+ config.Config{},
+ nil,
+ ).newRequestFinalizer(ctx,
+ 1,
+ "praefect",
+ &gitalypb.Repository{},
+ "replic-path",
+ "primary",
+ []string{},
+ []string{"secondary"},
+ datastore.UpdateRepo,
+ datastore.Params{"RelativePath": "relative-path"},
+ "rpc-name",
+ )()
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
+}
+
func TestStreamParametersContext(t *testing.T) {
// Because we're using NewFeatureFlag, they'll end up in the All array.
enabledFF := featureflag.NewFeatureFlag("default_enabled", "", "", true)
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index e01f6cd53..9c995a58f 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -270,7 +270,7 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) {
_, err := queue.Enqueue(ctx, ReplicationEvent{
Job: ReplicationJob{
Change: UpdateRepo,
- RelativePath: "/project/path-1",
+ RelativePath: fmt.Sprintf("/project/path-%d", i),
TargetNodeStorage: nodes[1],
SourceNodeStorage: nodes[0],
VirtualStorage: virtualStorage,
diff --git a/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go
new file mode 100644
index 000000000..8d15753f2
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go
@@ -0,0 +1,27 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20230118161145_replication_queue_unique_constraint",
+ Up: []string{`
+ --- Delete existing duplicates from the replication_queue table
+ DELETE FROM replication_queue a
+ USING replication_queue b
+ WHERE
+ a.id > b.id
+ AND a.job = b.job
+ AND a.state = b.state;
+`, `
+ --- Create the unique index which will prevent duplicates
+ CREATE UNIQUE INDEX replication_queue_constraint on replication_queue (
+ job,
+ state
+ );
+ `},
+ Down: []string{"DROP INDEX replication_queue_constraint;"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 2fc9c0b85..a65191eb1 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -12,6 +12,18 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)
+// ReplicationEventExistsError is returned when trying to add an already existing
+// replication job into the queue
+type ReplicationEventExistsError struct {
+ state string
+ job ReplicationJob
+}
+
+// Error returns the errors message.
+func (err ReplicationEventExistsError) Error() string {
+ return fmt.Sprintf("replication event %q -> %q already exists", err.state, err.job)
+}
+
// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
type ReplicationEventQueue interface {
// Enqueue puts provided event into the persistent queue.
@@ -229,6 +241,13 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
// this will always return a single row result (because of lock uniqueness) or an error
rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
if err != nil {
+ if glsql.IsUniqueViolation(err, "replication_queue_constraint") {
+ return ReplicationEvent{}, ReplicationEventExistsError{
+ state: event.State.String(),
+ job: event.Job,
+ }
+ }
+
return ReplicationEvent{}, fmt.Errorf("query: %w", err)
}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 6aac7ca4d..18ffd6659 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -300,32 +300,19 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
- event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
- require.NoError(t, err)
-
- expEvent2 := ReplicationEvent{
- ID: event2.ID,
- State: "ready",
- Attempt: 3,
- LockID: "praefect-0|gitaly-1|/project/path-1",
- Job: ReplicationJob{
- Change: UpdateRepo,
- RelativePath: "/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect-0",
- Params: nil,
- },
- }
+ _, err = queue.Enqueue(ctx, eventType1) // repeat of the same event
+ require.ErrorAs(t, err, &ReplicationEventExistsError{})
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
- requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event
+ // doesn't insert the same event again
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
+ // expected still one the same lock for repeated event
+ requireLocks(t, ctx, db, []LockRow{expLock1})
- event3, err := queue.Enqueue(ctx, eventType2) // event for another target
+ event2, err := queue.Enqueue(ctx, eventType2) // event for another target
require.NoError(t, err)
- expEvent3 := ReplicationEvent{
- ID: event3.ID,
+ expEvent2 := ReplicationEvent{
+ ID: event2.ID,
State: JobStateReady,
Attempt: 3,
LockID: "praefect-0|gitaly-2|/project/path-1",
@@ -339,14 +326,14 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
- event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
+ event3, err := queue.Enqueue(ctx, eventType3) // event for another repo
require.NoError(t, err)
- expEvent4 := ReplicationEvent{
- ID: event4.ID,
+ expEvent3 := ReplicationEvent{
+ ID: event3.ID,
State: JobStateReady,
Attempt: 3,
LockID: "praefect-1|gitaly-1|/project/path-2",
@@ -360,7 +347,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty
@@ -464,7 +451,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// events to fill in the queue
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4}
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
@@ -472,11 +459,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// first request to deque
- expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]}
+ expectedEvents1 := []ReplicationEvent{events[0], events[1], events[2]}
expectedJobLocks1 := []JobLockRow{
{JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: events[1].ID, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-2"},
}
// we expect only first two types of events by limiting count to 3
@@ -500,11 +487,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
// second request to deque
// there must be only last event fetched from the queue
- expectedEvents2 := []ReplicationEvent{events[5]}
+ expectedEvents2 := []ReplicationEvent{events[3]}
expectedEvents2[0].State = JobStateInProgress
expectedEvents2[0].Attempt = 2
- expectedJobLocks2 := []JobLockRow{{JobID: 6, LockID: "backup|gitaly-1|/project/path-1"}}
+ expectedJobLocks2 := []JobLockRow{{JobID: 4, LockID: "backup|gitaly-1|/project/path-1"}}
dequeuedEvents2, err := queue.Dequeue(ctx, "backup", "gitaly-1", 100500)
require.NoError(t, err)
@@ -550,10 +537,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
},
}
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType1)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -564,10 +549,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType2)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err = queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -578,7 +561,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-2"},
})
}
@@ -675,121 +658,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
},
}
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
require.NoError(t, err, "failed to fill in event queue")
}
- // we expect only first three types of events by limiting count to 3
- dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
+ // we expect only first two types events by limiting count to 2
+ dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 2)
require.NoError(t, err)
- require.Len(t, dequeuedEvents1, 3)
+ require.Len(t, dequeuedEvents1, 2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
})
// release lock for events of second type
- acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID})
+ acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[1].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{3}, acknowledge1)
+ require.Equal(t, []uint64{2}, acknowledge1)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
require.NoError(t, err)
- require.Len(t, dequeuedEvents2, 1, "expected: events of type 2 ('failed' will be fetched for retry)")
+ require.Len(t, dequeuedEvents2, 2, "expected: events of type 2 ('failed' will be fetched for retry)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
{ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
{JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
// creation of the new event that is equal to those already dequeue and processed
// it is used to verify that the event created after consuming events from queue won't be marked
// with previously created events as it may cause delay in replication
- _, err = queue.Enqueue(ctx, eventType1)
+ newEvent, err := queue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID})
+ acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[2].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2)
+ require.Equal(t, []uint64{events[0].ID, events[2].ID}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
})
- db.RequireRowsInTable(t, "replication_queue", 4)
+ db.RequireRowsInTable(t, "replication_queue", 3)
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
require.NoError(t, err)
require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[6].ID, LockID: events[6].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[1].ID, events[3].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3)
+ require.Equal(t, []uint64{events[1].ID, events[3].ID}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
db.RequireRowsInTable(t, "replication_queue", 1)
- newEvent, err := queue.Enqueue(ctx, eventType1)
- require.NoError(t, err)
-
acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID})
require.NoError(t, err)
require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
- db.RequireRowsInTable(t, "replication_queue", 2)
+ db.RequireRowsInTable(t, "replication_queue", 1)
var newEventState string
require.NoError(t, db.QueryRow("SELECT state FROM replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
}
@@ -1047,6 +1025,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
}
+// Check if the queue returns the expected error when adding a duplicate
+// replication event.
+func TestPostgresReplicationEventQueue_UniqueConstraint(t *testing.T) {
+ t.Parallel()
+
+ initialEvent := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ for _, tc := range []struct {
+ desc string
+ event ReplicationEvent
+ expectedErr error
+ expectedEvent *ReplicationEvent
+ expectedLock *LockRow
+ }{
+ {
+ desc: "throws error when same event added again",
+ event: initialEvent,
+ expectedErr: ReplicationEventExistsError{
+ // empty state defaults to `ready`
+ state: "",
+ job: initialEvent.Job,
+ },
+ },
+ {
+ desc: "Adds new event to the queue",
+ event: ReplicationEvent{
+ LockID: "praefect|gitaly-2|/project/path-2",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ },
+ expectedEvent: &ReplicationEvent{
+ ID: 2,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "praefect|gitaly-2|/project/path-2",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ },
+ expectedLock: &LockRow{ID: "praefect|gitaly-2|/project/path-2", Acquired: false},
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+ ctx := testhelper.Context(t)
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ actualEvent, err := queue.Enqueue(ctx, initialEvent)
+ require.NoError(t, err)
+ // we need to setup it to default because it is not possible to get it beforehand for expected
+ actualEvent.CreatedAt = time.Time{}
+
+ initialExpectedLock := LockRow{ID: "praefect|gitaly-1|/project/path-1", Acquired: false}
+ initialExpectedEvent := ReplicationEvent{
+ ID: 1,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "praefect|gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ require.Equal(t, initialExpectedEvent, actualEvent)
+ requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent})
+ // expected a new lock for new event
+ requireLocks(t, ctx, db, []LockRow{initialExpectedLock})
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
+
+ _, err = queue.Enqueue(ctx, tc.event)
+ require.Equal(t, tc.expectedErr, err)
+
+ if tc.expectedEvent != nil {
+ requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent, *tc.expectedEvent})
+ requireLocks(t, ctx, db, []LockRow{initialExpectedLock, *tc.expectedLock})
+ }
+ })
+ }
+}
+
func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) {
t.Helper()
diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go
index 680323d06..dfdce5adf 100644
--- a/internal/praefect/reconciler/reconciler.go
+++ b/internal/praefect/reconciler/reconciler.go
@@ -275,6 +275,7 @@ reconciliation_jobs AS (
-- only perform inserts if we managed to acquire the lock as otherwise
-- we'd schedule duplicate jobs
WHERE ( SELECT acquired FROM reconciliation_lock )
+ ON CONFLICT DO NOTHING
RETURNING lock_id, meta, job
),
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 86754130d..16604c057 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -509,7 +509,7 @@ func TestProcessBacklog_Success(t *testing.T) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
- assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once")
+ assert.Equal(t, []uint64{1, 2, 3}, ids, "all jobs must be processed at once")
}
return ackIDs, err
})
@@ -536,9 +536,6 @@ func TestProcessBacklog_Success(t *testing.T) {
_, err := queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
- _, err = queueInterceptor.Enqueue(ctx, eventType1)
- require.NoError(t, err)
-
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
fullNewPath1 := filepath.Join(backupCfg.Storages[0].Path, renameTo1)