diff options
Diffstat (limited to 'internal/praefect/replicator_test.go')
-rw-r--r-- | internal/praefect/replicator_test.go | 38 |
1 files changed, 28 insertions, 10 deletions
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 79e4135d7..6862b2f3f 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -2,6 +2,7 @@ package praefect import ( "context" + "fmt" "path/filepath" "strings" "sync" @@ -283,6 +284,10 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { } func TestReplicator_PropagateReplicationJob(t *testing.T) { + testhelper.NewFeatureSets(featureflag.MaintenanceOperationRouting).Run(t, testReplicatorPropagateReplicationJob) +} + +func testReplicatorPropagateReplicationJob(t *testing.T, ctx context.Context) { t.Parallel() primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1" @@ -318,16 +323,29 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { // By using WaitGroup we are sure the test cleanup will be started after all replication // requests are completed, so no running cache IO operations happen. queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) + var wg sync.WaitGroup - queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { - wg.Add(1) - return queue.Enqueue(ctx, event) - }) - queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { - acknowledged, err := queue.Acknowledge(ctx, state, eventIDs) - wg.Add(-len(eventIDs)) - return acknowledged, err - }) + if featureflag.MaintenanceOperationRouting.IsEnabled(ctx) { + // When maintenance operation routing is enabled we don't expect to see any + // replication events. The observed behaviour should still be the same though: we + // expect to observe the RPC calls on both the primary and secondary node because we + // route them to both at the same time. + queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + require.FailNow(t, "no replication jobs should have been created") + return datastore.ReplicationEvent{}, fmt.Errorf("unexpected enqueue") + }) + } else { + queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + wg.Add(1) + return queue.Enqueue(ctx, event) + }) + queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + acknowledged, err := queue.Acknowledge(ctx, state, eventIDs) + wg.Add(-len(eventIDs)) + return acknowledged, err + }) + } + logEntry := testhelper.NewDiscardingLogEntry(t) nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) @@ -362,7 +380,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) { prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, rs, nil, nil, nil, nil) listener, port := listenAvailPort(t) - ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx, cancel := context.WithCancel(ctx) go prf.Serve(listener) defer prf.Stop() |