Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-05-22 10:51:16 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-05-22 11:18:09 +0300
commitf93ff6df6a90c0d3c4931e2b908325a7554cfd53 (patch)
tree2e6cb5f3daeadd4009f96298835e2aed5505f293
parent0eab0013ae1659fc44cf4665aefe7018ac1691bb (diff)
Fix flaky TestPropagateReplicationJob test
Ensure all RPCs are completed before running test cleanup. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/3622
-rw-r--r--internal/praefect/replicator_test.go21
1 files changed, 20 insertions, 1 deletions
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 4a5081c1a..c4707ce7a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"strings"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -294,7 +295,24 @@ func TestPropagateReplicationJob(t *testing.T) {
},
}
- queue := datastore.NewMemoryReplicationEventQueue(conf)
+ // We need to await for the replication event to make a complete roundtrip to the remote.
+ // Because send to the channel happens during in-flight request there are ongoing filesystem
+ // operations related to caching. The cleanup happens before all IO cache operations finished
+ // those resulting to:
+ // unlinkat /tmp/gitaly-222007427/381349228/storages.d/internal-gitaly-1/+gitaly/state/path/to/repo: directory not empty
+ // By using WaitGroup we are sure the test cleanup will be started after all replication
+ // requests are completed, so no running cache IO operations happen.
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ var wg sync.WaitGroup
+ queue.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ wg.Add(1)
+ return queue.Enqueue(ctx, event)
+ })
+ queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, eventIDs []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ acknowledged, err := queue.Acknowledge(ctx, state, eventIDs)
+ wg.Add(-len(eventIDs))
+ return acknowledged, err
+ })
logEntry := testhelper.DiscardTestEntry(t)
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
@@ -406,6 +424,7 @@ func TestPropagateReplicationJob(t *testing.T) {
waitForRequest(t, secondaryServer.repackFullChan, expectedSecondaryRepackFullReq, 5*time.Second)
waitForRequest(t, secondaryServer.cleanupChan, expectedSecondaryCleanup, 5*time.Second)
waitForRequest(t, secondaryServer.packRefsChan, expectedSecondaryPackRefs, 5*time.Second)
+ wg.Wait()
}
type mockServer struct {