diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-05-22 10:51:16 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-05-22 11:18:09 +0300 |
commit | f93ff6df6a90c0d3c4931e2b908325a7554cfd53 (patch) | |
tree | 2e6cb5f3daeadd4009f96298835e2aed5505f293 | |
parent | 0eab0013ae1659fc44cf4665aefe7018ac1691bb (diff) |
Fix flaky TestPropagateReplicationJob test
Ensure all RPCs are completed before running test cleanup.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3622
-rw-r--r-- | internal/praefect/replicator_test.go | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 4a5081c1a..c4707ce7a 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "testing" "time" @@ -294,7 +295,24 @@ func TestPropagateReplicationJob(t *testing.T) { }, } - queue := datastore.NewMemoryReplicationEventQueue(conf) + // We need to await for the replication event to make a complete roundtrip to the remote. + // Because send to the channel happens during in-flight request there are ongoing filesystem + // operations related to caching. The cleanup happens before all IO cache operations finished + // those resulting to: + // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty + // By using WaitGroup we are sure the test cleanup will be started after all replication + // requests are completed, so no running cache IO operations happen. + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + var wg sync.WaitGroup + queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + wg.Add(1) + return queue.Enqueue(ctx, event) + }) + queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + acknowledged, err := queue.Acknowledge(ctx, state, eventIDs) + wg.Add(-len(eventIDs)) + return acknowledged, err + }) logEntry := testhelper.DiscardTestEntry(t) nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) @@ -406,6 +424,7 @@ func TestPropagateReplicationJob(t *testing.T) { waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second) waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second) waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second) + wg.Wait() } type mockServer struct { |