Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--_support/praefect-schema.sql7
-rw-r--r--cmd/praefect/subcmd_track_repository.go5
-rw-r--r--internal/praefect/coordinator.go4
-rw-r--r--internal/praefect/coordinator_test.go53
-rw-r--r--internal/praefect/datastore/collector_test.go2
-rw-r--r--internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go27
-rw-r--r--internal/praefect/datastore/queue.go19
-rw-r--r--internal/praefect/datastore/queue_test.go254
-rw-r--r--internal/praefect/reconciler/reconciler.go1
-rw-r--r--internal/praefect/replicator_test.go5
10 files changed, 289 insertions, 88 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql
index c0874080c..970f0e2ed 100644
--- a/_support/praefect-schema.sql
+++ b/_support/praefect-schema.sql
@@ -502,6 +502,13 @@ CREATE UNIQUE INDEX delete_replica_unique_index ON public.replication_queue USIN
--
+-- Name: replication_queue_constraint; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE UNIQUE INDEX replication_queue_constraint ON public.replication_queue USING btree (job, state);
+
+
+--
-- Name: replication_queue_target_index; Type: INDEX; Schema: public; Owner: -
--
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index e32fb6fd6..53e2c760b 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -10,6 +10,7 @@ import (
"math/rand"
"time"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v15/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
@@ -238,6 +239,10 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context,
}
if _, err := queue.Enqueue(ctx, event); err != nil {
+ if errors.As(err, &datastore.ReplicationEventExistsError{}) {
+ ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry")
+ return nil
+ }
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 51ddd0b45..b2f639f35 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -1098,6 +1098,10 @@ func (c *Coordinator) newRequestFinalizer(
g.Go(func() error {
if _, err := c.queue.Enqueue(ctx, event); err != nil {
+ if errors.As(err, &datastore.ReplicationEventExistsError{}) {
+ ctxlogrus.Extract(ctx).WithError(err).Info("replication event queue already has similar entry")
+ return nil
+ }
return fmt.Errorf("enqueue replication event: %w", err)
}
return nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index de7ccf360..d8a024e1b 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -2893,6 +2893,59 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
}
}
+func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) {
+ t.Parallel()
+ ctx := testhelper.Context(t)
+
+ for _, tc := range []struct {
+ desc string
+ enqueueErr error
+ expectedErr error
+ }{
+ {
+ desc: "most errors are propagated back",
+ enqueueErr: errors.New("enqueue failed"),
+ expectedErr: fmt.Errorf("enqueue replication event: %w", errors.New("enqueue failed")),
+ },
+ {
+ desc: "replication event exists errors are ignored ",
+ enqueueErr: datastore.ReplicationEventExistsError{},
+ expectedErr: nil,
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ err := NewCoordinator(
+ &datastore.MockReplicationEventQueue{
+ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
+ return datastore.ReplicationEvent{}, tc.enqueueErr
+ },
+ },
+ datastore.MockRepositoryStore{},
+ nil,
+ nil,
+ config.Config{},
+ nil,
+ ).newRequestFinalizer(ctx,
+ 1,
+ "praefect",
+ &gitalypb.Repository{},
+ "replic-path",
+ "primary",
+ []string{},
+ []string{"secondary"},
+ datastore.UpdateRepo,
+ datastore.Params{"RelativePath": "relative-path"},
+ "rpc-name",
+ )()
+ require.Equal(t, tc.expectedErr, err)
+ })
+ }
+}
+
func TestStreamParametersContext(t *testing.T) {
// Because we're using NewFeatureFlag, they'll end up in the All array.
enabledFF := featureflag.NewFeatureFlag("default_enabled", "", "", true)
diff --git a/internal/praefect/datastore/collector_test.go b/internal/praefect/datastore/collector_test.go
index e01f6cd53..9c995a58f 100644
--- a/internal/praefect/datastore/collector_test.go
+++ b/internal/praefect/datastore/collector_test.go
@@ -270,7 +270,7 @@ func TestRepositoryStoreCollector_ReplicationQueueDepth(t *testing.T) {
_, err := queue.Enqueue(ctx, ReplicationEvent{
Job: ReplicationJob{
Change: UpdateRepo,
- RelativePath: "/project/path-1",
+ RelativePath: fmt.Sprintf("/project/path-%d", i),
TargetNodeStorage: nodes[1],
SourceNodeStorage: nodes[0],
VirtualStorage: virtualStorage,
diff --git a/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go
new file mode 100644
index 000000000..8d15753f2
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20230118161145_replication_queue_unique_constraint.go
@@ -0,0 +1,27 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20230118161145_replication_queue_unique_constraint",
+ Up: []string{`
+ --- Delete existing duplicates from the replication_queue table
+ DELETE FROM replication_queue a
+ USING replication_queue b
+ WHERE
+ a.id > b.id
+ AND a.job = b.job
+ AND a.state = b.state;
+`, `
+ --- Create the unique index which will prevent duplicates
+ CREATE UNIQUE INDEX replication_queue_constraint on replication_queue (
+ job,
+ state
+ );
+ `},
+ Down: []string{"DROP INDEX replication_queue_constraint;"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 2fc9c0b85..a65191eb1 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -12,6 +12,18 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)
+// ReplicationEventExistsError is returned when trying to add an already existing
+// replication job into the queue
+type ReplicationEventExistsError struct {
+ state string
+ job ReplicationJob
+}
+
+// Error returns the errors message.
+func (err ReplicationEventExistsError) Error() string {
+ return fmt.Sprintf("replication event %q -> %q already exists", err.state, err.job)
+}
+
// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
type ReplicationEventQueue interface {
// Enqueue puts provided event into the persistent queue.
@@ -229,6 +241,13 @@ func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event Repli
// this will always return a single row result (because of lock uniqueness) or an error
rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
if err != nil {
+ if glsql.IsUniqueViolation(err, "replication_queue_constraint") {
+ return ReplicationEvent{}, ReplicationEventExistsError{
+ state: event.State.String(),
+ job: event.Job,
+ }
+ }
+
return ReplicationEvent{}, fmt.Errorf("query: %w", err)
}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 6aac7ca4d..18ffd6659 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -300,32 +300,19 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
- event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
- require.NoError(t, err)
-
- expEvent2 := ReplicationEvent{
- ID: event2.ID,
- State: "ready",
- Attempt: 3,
- LockID: "praefect-0|gitaly-1|/project/path-1",
- Job: ReplicationJob{
- Change: UpdateRepo,
- RelativePath: "/project/path-1",
- TargetNodeStorage: "gitaly-1",
- SourceNodeStorage: "gitaly-0",
- VirtualStorage: "praefect-0",
- Params: nil,
- },
- }
+ _, err = queue.Enqueue(ctx, eventType1) // repeat of the same event
+ require.ErrorAs(t, err, &ReplicationEventExistsError{})
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
- requireLocks(t, ctx, db, []LockRow{expLock1}) // expected still one the same lock for repeated event
+ // doesn't insert the same event again
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
+ // expected still one the same lock for repeated event
+ requireLocks(t, ctx, db, []LockRow{expLock1})
- event3, err := queue.Enqueue(ctx, eventType2) // event for another target
+ event2, err := queue.Enqueue(ctx, eventType2) // event for another target
require.NoError(t, err)
- expEvent3 := ReplicationEvent{
- ID: event3.ID,
+ expEvent2 := ReplicationEvent{
+ ID: event2.ID,
State: JobStateReady,
Attempt: 3,
LockID: "praefect-0|gitaly-2|/project/path-1",
@@ -339,14 +326,14 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2}) // the new lock for another target repeated event
- event4, err := queue.Enqueue(ctx, eventType3) // event for another repo
+ event3, err := queue.Enqueue(ctx, eventType3) // event for another repo
require.NoError(t, err)
- expEvent4 := ReplicationEvent{
- ID: event4.ID,
+ expEvent3 := ReplicationEvent{
+ ID: event3.ID,
State: JobStateReady,
Attempt: 3,
LockID: "praefect-1|gitaly-1|/project/path-2",
@@ -360,7 +347,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
},
}
- requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
+ requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty
@@ -464,7 +451,7 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// events to fill in the queue
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType4}
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
@@ -472,11 +459,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
}
// first request to deque
- expectedEvents1 := []ReplicationEvent{events[0], events[2], events[4]}
+ expectedEvents1 := []ReplicationEvent{events[0], events[1], events[2]}
expectedJobLocks1 := []JobLockRow{
{JobID: events[0].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: events[4].ID, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: events[1].ID, LockID: "praefect|gitaly-1|/project/path-1"},
+ {JobID: events[2].ID, LockID: "praefect|gitaly-1|/project/path-2"},
}
// we expect only first two types of events by limiting count to 3
@@ -500,11 +487,11 @@ func TestPostgresReplicationEventQueue_DequeueMultiple(t *testing.T) {
// second request to deque
// there must be only last event fetched from the queue
- expectedEvents2 := []ReplicationEvent{events[5]}
+ expectedEvents2 := []ReplicationEvent{events[3]}
expectedEvents2[0].State = JobStateInProgress
expectedEvents2[0].Attempt = 2
- expectedJobLocks2 := []JobLockRow{{JobID: 6, LockID: "backup|gitaly-1|/project/path-1"}}
+ expectedJobLocks2 := []JobLockRow{{JobID: 4, LockID: "backup|gitaly-1|/project/path-1"}}
dequeuedEvents2, err := queue.Dequeue(ctx, "backup", "gitaly-1", 100500)
require.NoError(t, err)
@@ -550,10 +537,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
},
}
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType1)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err := queue.Enqueue(ctx, eventType1)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -564,10 +549,8 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"}})
- for i := 0; i < 2; i++ {
- _, err := queue.Enqueue(ctx, eventType2)
- require.NoError(t, err, "failed to fill in event queue")
- }
+ _, err = queue.Enqueue(ctx, eventType2)
+ require.NoError(t, err, "failed to fill in event queue")
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 1)
require.NoError(t, err)
@@ -578,7 +561,7 @@ func TestPostgresReplicationEventQueue_DequeueSameStorageOtherRepository(t *test
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: 1, LockID: "praefect|gitaly-1|/project/path-1"},
- {JobID: 3, LockID: "praefect|gitaly-1|/project/path-2"},
+ {JobID: 2, LockID: "praefect|gitaly-1|/project/path-2"},
})
}
@@ -675,121 +658,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
},
}
- events := []ReplicationEvent{eventType1, eventType1, eventType2, eventType1, eventType3, eventType2, eventType4} // events to fill in the queue
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
for i := range events {
var err error
events[i], err = queue.Enqueue(ctx, events[i])
require.NoError(t, err, "failed to fill in event queue")
}
- // we expect only first three types of events by limiting count to 3
- dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
+ // we expect only first two types events by limiting count to 2
+ dequeuedEvents1, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 2)
require.NoError(t, err)
- require.Len(t, dequeuedEvents1, 3)
+ require.Len(t, dequeuedEvents1, 2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
})
// release lock for events of second type
- acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[2].ID})
+ acknowledge1, err := queue.Acknowledge(ctx, JobStateFailed, []uint64{events[1].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{3}, acknowledge1)
+ require.Equal(t, []uint64{2}, acknowledge1)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
dequeuedEvents2, err := queue.Dequeue(ctx, "praefect", "gitaly-1", 3)
require.NoError(t, err)
- require.Len(t, dequeuedEvents2, 1, "expected: events of type 2 ('failed' will be fetched for retry)")
+ require.Len(t, dequeuedEvents2, 2, "expected: events of type 2 ('failed' will be fetched for retry)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
{ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: true},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
{JobID: events[0].ID, LockID: events[0].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
{JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[4].ID, LockID: events[4].LockID},
})
// creation of the new event that is equal to those already dequeue and processed
// it is used to verify that the event created after consuming events from queue won't be marked
// with previously created events as it may cause delay in replication
- _, err = queue.Enqueue(ctx, eventType1)
+ newEvent, err := queue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[4].ID})
+ acknowledge2, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[0].ID, events[2].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[0].ID, events[4].ID}, acknowledge2)
+ require.Equal(t, []uint64{events[0].ID, events[2].ID}, acknowledge2)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
})
- db.RequireRowsInTable(t, "replication_queue", 4)
+ db.RequireRowsInTable(t, "replication_queue", 3)
dequeuedEvents3, err := queue.Dequeue(ctx, "praefect", "gitaly-2", 3)
require.NoError(t, err)
require.Len(t, dequeuedEvents3, 1, "expected: event of type 4")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
- {ID: events[2].LockID, Acquired: true},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: true},
+ {ID: events[1].LockID, Acquired: true},
+ {ID: events[2].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: true},
})
requireJobLocks(t, ctx, db, []JobLockRow{
- {JobID: events[2].ID, LockID: events[2].LockID},
- {JobID: events[6].ID, LockID: events[6].LockID},
+ {JobID: events[1].ID, LockID: events[1].LockID},
+ {JobID: events[3].ID, LockID: events[3].LockID},
})
- acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[2].ID, events[6].ID})
+ acknowledged3, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{events[1].ID, events[3].ID})
require.NoError(t, err)
- require.Equal(t, []uint64{events[2].ID, events[6].ID}, acknowledged3)
+ require.Equal(t, []uint64{events[1].ID, events[3].ID}, acknowledged3)
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
db.RequireRowsInTable(t, "replication_queue", 1)
- newEvent, err := queue.Enqueue(ctx, eventType1)
- require.NoError(t, err)
-
acknowledge4, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{newEvent.ID})
require.NoError(t, err)
require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
- db.RequireRowsInTable(t, "replication_queue", 2)
+ db.RequireRowsInTable(t, "replication_queue", 1)
var newEventState string
require.NoError(t, db.QueryRow("SELECT state FROM replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: events[0].LockID, Acquired: false},
+ {ID: events[1].LockID, Acquired: false},
{ID: events[2].LockID, Acquired: false},
- {ID: events[4].LockID, Acquired: false},
- {ID: events[6].LockID, Acquired: false},
+ {ID: events[3].LockID, Acquired: false},
})
requireJobLocks(t, ctx, db, nil)
}
@@ -1047,6 +1025,116 @@ func TestPostgresReplicationEventQueue_AcknowledgeStale(t *testing.T) {
})
}
+// Check if the queue returns the expected error when adding a duplicate
+// replication event.
+func TestPostgresReplicationEventQueue_UniqueConstraint(t *testing.T) {
+ t.Parallel()
+
+ initialEvent := ReplicationEvent{
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ for _, tc := range []struct {
+ desc string
+ event ReplicationEvent
+ expectedErr error
+ expectedEvent *ReplicationEvent
+ expectedLock *LockRow
+ }{
+ {
+ desc: "throws error when same event added again",
+ event: initialEvent,
+ expectedErr: ReplicationEventExistsError{
+ // empty state defaults to `ready`
+ state: "",
+ job: initialEvent.Job,
+ },
+ },
+ {
+ desc: "Adds new event to the queue",
+ event: ReplicationEvent{
+ LockID: "praefect|gitaly-2|/project/path-2",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ },
+ expectedEvent: &ReplicationEvent{
+ ID: 2,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "praefect|gitaly-2|/project/path-2",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ },
+ expectedLock: &LockRow{ID: "praefect|gitaly-2|/project/path-2", Acquired: false},
+ },
+ } {
+ tc := tc
+
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+ ctx := testhelper.Context(t)
+
+ queue := PostgresReplicationEventQueue{db.DB}
+
+ actualEvent, err := queue.Enqueue(ctx, initialEvent)
+ require.NoError(t, err)
+ // we need to setup it to default because it is not possible to get it beforehand for expected
+ actualEvent.CreatedAt = time.Time{}
+
+ initialExpectedLock := LockRow{ID: "praefect|gitaly-1|/project/path-1", Acquired: false}
+ initialExpectedEvent := ReplicationEvent{
+ ID: 1,
+ State: JobStateReady,
+ Attempt: 3,
+ LockID: "praefect|gitaly-1|/project/path-1",
+ Job: ReplicationJob{
+ Change: UpdateRepo,
+ RelativePath: "/project/path-1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "praefect",
+ Params: nil,
+ },
+ }
+
+ require.Equal(t, initialExpectedEvent, actualEvent)
+ requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent})
+ // expected a new lock for new event
+ requireLocks(t, ctx, db, []LockRow{initialExpectedLock})
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
+
+ _, err = queue.Enqueue(ctx, tc.event)
+ require.Equal(t, tc.expectedErr, err)
+
+ if tc.expectedEvent != nil {
+ requireEvents(t, ctx, db, []ReplicationEvent{initialExpectedEvent, *tc.expectedEvent})
+ requireLocks(t, ctx, db, []LockRow{initialExpectedLock, *tc.expectedLock})
+ }
+ })
+ }
+}
+
func requireEvents(t *testing.T, ctx context.Context, db testdb.DB, expected []ReplicationEvent) {
t.Helper()
diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go
index 680323d06..dfdce5adf 100644
--- a/internal/praefect/reconciler/reconciler.go
+++ b/internal/praefect/reconciler/reconciler.go
@@ -275,6 +275,7 @@ reconciliation_jobs AS (
-- only perform inserts if we managed to acquire the lock as otherwise
-- we'd schedule duplicate jobs
WHERE ( SELECT acquired FROM reconciliation_lock )
+ ON CONFLICT DO NOTHING
RETURNING lock_id, meta, job
),
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 86754130d..16604c057 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -509,7 +509,7 @@ func TestProcessBacklog_Success(t *testing.T) {
ackIDs, err := queue.Acknowledge(ctx, state, ids)
if len(ids) > 0 {
assert.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
- assert.Equal(t, []uint64{1, 3, 4}, ids, "all jobs must be processed at once")
+ assert.Equal(t, []uint64{1, 2, 3}, ids, "all jobs must be processed at once")
}
return ackIDs, err
})
@@ -536,9 +536,6 @@ func TestProcessBacklog_Success(t *testing.T) {
_, err := queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
- _, err = queueInterceptor.Enqueue(ctx, eventType1)
- require.NoError(t, err)
-
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
fullNewPath1 := filepath.Join(backupCfg.Storages[0].Path, renameTo1)