Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-11-06 22:37:03 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-11-06 22:37:03 +0300
commit8897e645005c9728c349978d17eb653fd62571d5 (patch)
tree27d8af20bd0912a06900c4540c6110402abf2ab8 /internal
parentbb0cef0a62f708e60a16ad4c2e9096865434f336 (diff)
Propagate repository removal to Praefect backends
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/coordinator.go19
-rw-r--r--internal/praefect/coordinator_test.go1
-rw-r--r--internal/praefect/datastore.go18
-rw-r--r--internal/praefect/datastore_test.go7
-rw-r--r--internal/praefect/helper_test.go54
-rw-r--r--internal/praefect/replicator.go70
-rw-r--r--internal/praefect/replicator_test.go6
-rw-r--r--internal/praefect/server_test.go143
-rw-r--r--internal/testhelper/testhelper.go4
9 files changed, 276 insertions, 46 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index e8706a101..e4950f203 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -24,6 +24,10 @@ import (
"google.golang.org/grpc/codes"
)
+func isDestructive(methodName string) bool {
+ return methodName == "/gitaly.RepositoryService/RemoveRepository"
+}
+
// Coordinator takes care of directing client requests to the appropriate
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
@@ -80,7 +84,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var storage string
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker)
+ storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
if err != nil {
return nil, nil, nil, err
}
@@ -116,7 +120,7 @@ func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
return node[0].Storage, noopRequestFinalizer, nil
}
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) {
+func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, method string) (string, func(), error) {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
return "", nil, err
@@ -154,7 +158,12 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
requestFinalizer := noopRequestFinalizer
if mi.Operation == protoregistry.OpMutator {
- if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil {
+ change := UpdateRepo
+ if isDestructive(method) {
+ change = DeleteRepo
+ }
+
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, change); err != nil {
return "", nil, err
}
}
@@ -222,8 +231,8 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func(), error) {
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath)
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change ChangeType) (func(), error) {
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 28c581784..b220c7b6f 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -96,6 +96,7 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
expectedJob := ReplJob{
+ Change: UpdateRepo,
ID: 1,
TargetNode: targetNode,
SourceNode: sourceNode,
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 4d97abc55..d100d8bcc 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -38,10 +38,21 @@ const (
JobStateCancelled
)
+// ChangeType indicates what kind of change the replication is propagating
+type ChangeType int
+
+const (
+ // UpdateRepo is when a replication updates a repository in place
+ UpdateRepo ChangeType = iota + 1
+ // DeleteRepo is when a replication deletes a repo
+ DeleteRepo
+)
+
// ReplJob is an instance of a queued replication job. A replication job is
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
+ Change ChangeType
ID uint64 // autoincrement ID
TargetNode, SourceNode models.Node // which node to replicate to?
Repository models.Repository // source for replication
@@ -99,13 +110,14 @@ type ReplJobsDatastore interface {
// CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string, change ChangeType) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
type jobRecord struct {
+ change ChangeType
relativePath string // project's relative path
targetNodeID, sourceNodeID int
state JobState
@@ -360,6 +372,7 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
}
return ReplJob{
+ Change: record.change,
ID: jobID,
Repository: *repository,
SourceNode: sourceNode,
@@ -374,7 +387,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -396,6 +409,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64,
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.records[nextID] = jobRecord{
+ change: change,
targetNodeID: secondary.ID,
state: JobStatePending,
relativePath: relativePath,
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 3634aa969..4c8ce6dac 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -44,7 +44,7 @@ var operations = []struct {
{
desc: "insert first replication job before secondary mapped to primary",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, UpdateRepo)
require.Error(t, err, ErrInvalidReplTarget)
},
},
@@ -65,7 +65,7 @@ var operations = []struct {
{
desc: "insert first replication job after secondary mapped to primary",
opFn: func(t *testing.T, ds Datastore) {
- ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
+ ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, UpdateRepo)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
@@ -78,7 +78,8 @@ var operations = []struct {
require.Len(t, jobs, 1)
expectedJob := ReplJob{
- ID: 1,
+ Change: UpdateRepo,
+ ID: 1,
Repository: models.Repository{
RelativePath: repo1Repository.RelativePath,
Primary: stor1,
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index dd8064fd8..0637f40de 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -19,7 +19,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/service/repository"
gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -102,13 +104,13 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr
// Each mock server is keyed by the corresponding index of the node in the
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
-func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, func()) {
+func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
datastore, clientCC, prf := setupServer(t, conf, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
require.Equal(t, len(backends), len(conf.Nodes),
"mock server count doesn't match config nodes")
- var cleanups []func()
+ var cleanups []testhelper.Cleanup
for id, nodeStorage := range datastore.storageNodes.m {
backend, ok := backends[id]
@@ -151,13 +153,16 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[in
}
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
-func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) {
+func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
datastore := NewMemoryDatastore(conf)
logEntry := log.Default()
clientCC := conn.NewClientConnections()
+ var cleanups []testhelper.Cleanup
+
for id, nodeStorage := range datastore.storageNodes.m {
- _, backend := runInternalGitalyServer(t, nodeStorage.Token)
+ _, backend, cleanup := runInternalGitalyServer(t, nodeStorage.Token)
+ cleanups = append(cleanups, cleanup)
clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token)
nodeStorage.Address = backend
@@ -186,18 +191,31 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
t.Logf("proxy listening on port %d", port)
errQ := make(chan error)
+ ctx, cancel := testhelper.Context()
- go func() {
- errQ <- prf.Start(listener)
- }()
+ go func() { errQ <- prf.Start(listener) }()
+ go func() { errQ <- replmgr.ProcessBacklog(ctx) }()
// dial client to praefect
cc := dialLocalPort(t, port, false)
- return cc, prf
+ cleanup := func() {
+ for _, cu := range cleanups {
+ cu()
+ }
+
+ ctx, _ := context.WithTimeout(ctx, time.Second)
+ require.NoError(t, prf.Shutdown(ctx))
+ require.NoError(t, <-errQ)
+
+ cancel()
+ require.Error(t, context.Canceled, <-errQ)
+ }
+
+ return cc, prf, cleanup
}
-func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string) {
+func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, func()) {
streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(internalauth.Config{Token: token})}
unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor(internalauth.Config{Token: token})}
@@ -209,11 +227,25 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string)
t.Fatal(err)
}
+ rubyServer := &rubyserver.Server{}
+ require.NoError(t, rubyServer.Start())
+
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
+ gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer))
+
+ errQ := make(chan error)
- go server.Serve(listener)
+ go func() {
+ errQ <- server.Serve(listener)
+ }()
+
+ cleanup := func() {
+ rubyServer.Stop()
+ server.Stop()
+ require.NoError(t, <-errQ)
+ }
- return server, "unix://" + serverSocketPath
+ return server, "unix://" + serverSocketPath, cleanup
}
func mustLoadProtoReg(t testing.TB) *descriptor.FileDescriptorProto {
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 63511ae4c..57cbf6a30 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -54,7 +54,10 @@ func init() {
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
+ // Replicate propagates changes from the source to the target
Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error
+ // Destroy will remove the target repo on the specified target connection
+ Destroy(ctx context.Context, job ReplJob, target *grpc.ClientConn) error
}
type defaultReplicator struct {
@@ -111,6 +114,21 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC
return nil
}
+func (dr defaultReplicator) Destroy(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.Repository.RelativePath,
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: targetRepo,
+ })
+
+ return err
+}
+
func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error {
return func() error {
primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
@@ -207,7 +225,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath)
+ id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, UpdateRepo)
if err != nil {
return err
}
@@ -221,8 +239,10 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
}
const (
- jobFetchInterval = 10 * time.Millisecond
- logWithReplJobID = "replication_job_id"
+ jobFetchInterval = 10 * time.Millisecond
+ logWithReplJobID = "replication_job_id"
+ logWithReplSource = "replication_job_source"
+ logWithReplTarget = "replication_job_target"
)
// ProcessBacklog will process queued jobs. It will block while processing jobs.
@@ -262,48 +282,66 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
"to_storage": job.TargetNode.Storage,
"relative_path": job.Repository.RelativePath,
}).Info("processing replication job")
- if err := r.processReplJob(ctx, job); err != nil {
- return err
- }
+ r.processReplJob(ctx, job)
}
}
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
+// TODO: errors that occur during replication should be handled better. Logging
+// is a crutch in this situation. Ideally, we need to update state somewhere
+// with information regarding the replication failure. See follow up issue:
+// https://gitlab.com/gitlab-org/gitaly/issues/2138
+func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) {
+ l := r.log.
+ WithField(logWithReplJobID, job.ID).
+ WithField(logWithReplSource, job.SourceNode).
+ WithField(logWithReplTarget, job.TargetNode)
+
if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
- return err
+ l.WithError(err).Error("unable to update replication job to in progress")
+ return
}
targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
if err != nil {
- return err
+ l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
+ return
}
sourceCC, err := r.clientConnections.GetConnection(job.Repository.Primary.Storage)
if err != nil {
- return err
+ l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
+ return
}
injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
- return err
+ l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
+ return
}
replStart := time.Now()
incReplicationJobsInFlight()
defer decReplicationJobsInFlight()
- if err := r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
+ switch job.Change {
+ case UpdateRepo:
+ err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC)
+ case DeleteRepo:
+ err = r.replicator.Destroy(injectedCtx, job, targetCC)
+ default:
+ err = fmt.Errorf("unknown replication change type encountered: %d", job.Change)
+ }
+ if err != nil {
+ l.WithError(err).Error("unable to replicate")
+ return
}
replDuration := time.Since(replStart)
recordReplicationLatency(float64(replDuration / time.Millisecond))
if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
+ l.WithError(err).Error("error when updating replication job status to complete")
}
- return nil
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index ac57a1fe7..44eb7873e 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -80,7 +80,9 @@ func TestProceessReplicationJob(t *testing.T) {
},
}
- replJob := ReplJob{ID: 1,
+ replJob := ReplJob{
+ Change: UpdateRepo,
+ ID: 1,
TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath},
SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken},
Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()},
@@ -108,7 +110,7 @@ func TestProceessReplicationJob(t *testing.T) {
replicator: replicator,
}
- require.NoError(t, replMgr.processReplJob(ctx, replJob))
+ replMgr.processReplJob(ctx, replJob)
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index dfc28f04b..956d1158f 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -3,6 +3,8 @@ package praefect
import (
"context"
"fmt"
+ "io/ioutil"
+ "os"
"strings"
"testing"
"time"
@@ -12,7 +14,9 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -84,8 +88,8 @@ func TestGitalyServerInfo(t *testing.T) {
Token: "xyz",
}},
}
- cc, srv := runPraefectServerWithGitaly(t, conf)
- defer srv.s.Stop()
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
client := gitalypb.NewServerServiceClient(cc)
@@ -107,10 +111,10 @@ func TestGitalyServerInfo(t *testing.T) {
}
func TestHealthCheck(t *testing.T) {
- cc, srv := runPraefectServerWithGitaly(t, config.Config{})
- defer srv.s.Stop()
+ cc, _, cleanup := runPraefectServerWithGitaly(t, testConfig(1))
+ defer cleanup()
- ctx, cancel := testhelper.Context()
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client := healthpb.NewHealthClient(cc)
@@ -130,8 +134,8 @@ func TestRejectBadStorage(t *testing.T) {
},
}
- cc, srv := runPraefectServerWithGitaly(t, conf)
- defer srv.s.Stop()
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
badTargetRepo := gitalypb.Repository{
StorageName: "default",
@@ -175,3 +179,128 @@ func TestWarnDuplicateAddrs(t *testing.T) {
}
t.Fatal("could not find expected log message")
}
+
+func TestRepoRemoval(t *testing.T) {
+ conf := config.Config{
+ VirtualStorageName: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: gconfig.Config.Storages[0].Name,
+ Address: "tcp::/samesies",
+ },
+ &models.Node{
+ ID: 1,
+ Storage: "praefect-internal-1",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ &models.Node{
+ ID: 2,
+ Storage: "praefect-internal-2",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
+ }
+
+ oldStorages := gconfig.Config.Storages
+ defer func() { gconfig.Config.Storages = oldStorages }()
+
+ testStorages := []gconfig.Storage{
+ {
+ Name: conf.Nodes[1].Storage,
+ Path: tempStoragePath(t),
+ },
+ {
+ Name: conf.Nodes[2].Storage,
+ Path: tempStoragePath(t),
+ },
+ }
+ gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...)
+ defer func() {
+ for _, s := range testStorages {
+ require.NoError(t, os.RemoveAll(s.Path))
+ }
+ }()
+
+ tRepo, _, tCleanup := testhelper.NewTestRepo(t)
+ defer tCleanup()
+
+ _, path1, cleanup1 := cloneRepoAtStorage(t, tRepo, conf.Nodes[1].Storage)
+ defer cleanup1()
+ _, path2, cleanup2 := cloneRepoAtStorage(t, tRepo, conf.Nodes[2].Storage)
+ defer cleanup2()
+
+ // prerequisite: repos should exist at expected paths
+ require.DirExists(t, path1)
+ require.DirExists(t, path2)
+
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ virtualRepo := *tRepo
+ virtualRepo.StorageName = conf.VirtualStorageName
+
+ rClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ _, err := rClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: &virtualRepo,
+ })
+ require.NoError(t, err)
+
+ resp, err := rClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &virtualRepo,
+ })
+ require.NoError(t, err)
+ require.Equal(t, false, resp.GetExists())
+
+ // the removal of the repo on the secondary servers is not deterministic
+ // since it relies on eventually consistent replication
+ pollUntilRemoved(t, path1, time.After(10*time.Second))
+ pollUntilRemoved(t, path2, time.After(10*time.Second))
+}
+
+func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) {
+ for {
+ select {
+ case <-deadline:
+ require.Failf(t, "unable to detect path removal for %s", path)
+ default:
+ _, err := os.Stat(path)
+ switch {
+ case err != nil && os.IsNotExist(err):
+ return
+ case err == nil:
+ break
+ default:
+ require.Failf(t, "unexpected error while checking path %s", path)
+ }
+ }
+ time.Sleep(time.Millisecond)
+ }
+}
+
+func tempStoragePath(t testing.TB) string {
+ p, err := ioutil.TempDir("", t.Name())
+ require.NoError(t, err)
+ return p
+}
+
+func cloneRepoAtStorage(t testing.TB, src *gitalypb.Repository, storageName string) (*gitalypb.Repository, string, func()) {
+ dst := *src
+ dst.StorageName = storageName
+
+ dstP, err := helper.GetPath(&dst)
+ require.NoError(t, err)
+
+ srcP, err := helper.GetPath(src)
+ require.NoError(t, err)
+
+ require.NoError(t, os.MkdirAll(dstP, 0755))
+ testhelper.MustRunCommand(t, nil, "git",
+ "clone", "--no-hardlinks", "--dissociate", "--bare", srcP, dstP)
+
+ return &dst, dstP, func() { require.NoError(t, os.RemoveAll(dstP)) }
+}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index d547d865a..178e174dd 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -525,3 +525,7 @@ func gitObjectExists(t testing.TB, repoPath, sha string, exists bool) {
}
require.Error(t, cmd.Run(), "checking for object should fail")
}
+
+// Cleanup functions should be called in a defer statement
+// immediately after they are returned from a test helper
+type Cleanup func()