Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/replicator_test.go')
-rw-r--r--internal/praefect/replicator_test.go38
1 files changed, 28 insertions, 10 deletions
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 79e4135d7..6862b2f3f 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -2,6 +2,7 @@ package praefect
import (
"context"
+ "fmt"
"path/filepath"
"strings"
"sync"
@@ -283,6 +284,10 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
}
func TestReplicator_PropagateReplicationJob(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.MaintenanceOperationRouting).Run(t, testReplicatorPropagateReplicationJob)
+}
+
+func testReplicatorPropagateReplicationJob(t *testing.T, ctx context.Context) {
t.Parallel()
primaryStorage, secondaryStorage := "internal-gitaly-0", "internal-gitaly-1"
@@ -318,16 +323,29 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
// By using WaitGroup we are sure the test cleanup will be started after all replication
// requests are completed, so no running cache IO operations happen.
queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
+
var wg sync.WaitGroup
- queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- wg.Add(1)
- return queue.Enqueue(ctx, event)
- })
- queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
- acknowledged, err := queue.Acknowledge(ctx, state, eventIDs)
- wg.Add(-len(eventIDs))
- return acknowledged, err
- })
+ if featureflag.MaintenanceOperationRouting.IsEnabled(ctx) {
+ // When maintenance operation routing is enabled we don't expect to see any
+ // replication events. The observed behaviour should still be the same though: we
+ // expect to observe the RPC calls on both the primary and secondary node because we
+ // route them to both at the same time.
+ queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ require.FailNow(t, "no replication jobs should have been created")
+ return datastore.ReplicationEvent{}, fmt.Errorf("unexpected enqueue")
+ })
+ } else {
+ queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ wg.Add(1)
+ return queue.Enqueue(ctx, event)
+ })
+ queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ acknowledged, err := queue.Acknowledge(ctx, state, eventIDs)
+ wg.Add(-len(eventIDs))
+ return acknowledged, err
+ })
+ }
+
logEntry := testhelper.NewDiscardingLogEntry(t)
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
@@ -362,7 +380,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, rs, nil, nil, nil, nil)
listener, port := listenAvailPort(t)
- ctx, cancel := context.WithCancel(testhelper.Context(t))
+ ctx, cancel := context.WithCancel(ctx)
go prf.Serve(listener)
defer prf.Stop()