Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-02-21 13:21:15 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-03-03 11:57:39 +0300
commit601c80333ec9f697664818e87a47168a7f086b38 (patch)
tree564b525708d170cf9bef23566d02eb6d1a58a432
parent1b5b2cc393a98db0502e12d13823286ca1365173 (diff)
Replication: propagate RenameRepository RPC to Praefect secondaries
The RenameRepository is a mutator operation, however it is not about changing Git data, but a metadata of the repository. That is why it is must be treated specially same as RemoveRepository operation. Besides to make a replication an additional data required - the parameter of the initial request(new repository name). Previously Replicator marked all jobs that has same relative path as Cancelled which is not correct because source and target may be different for each job entry. Closes: https://gitlab.com/gitlab-org/gitaly/issues/2172
-rw-r--r--changelogs/unreleased/ps-replication-rename-repository.yml5
-rw-r--r--internal/praefect/coordinator.go35
-rw-r--r--internal/praefect/datastore/datastore.go30
-rw-r--r--internal/praefect/datastore/datastore_test.go62
-rw-r--r--internal/praefect/replicator.go63
-rw-r--r--internal/praefect/replicator_test.go51
-rw-r--r--internal/praefect/server_test.go121
7 files changed, 318 insertions, 49 deletions
diff --git a/changelogs/unreleased/ps-replication-rename-repository.yml b/changelogs/unreleased/ps-replication-rename-repository.yml
new file mode 100644
index 000000000..fe97d5b83
--- /dev/null
+++ b/changelogs/unreleased/ps-replication-rename-repository.yml
@@ -0,0 +1,5 @@
+---
+title: 'Replication: propagate RenameRepository RPC to Praefect secondaries'
+merge_request: 1853
+author:
+type: changed
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 14f58ae29..e4cf73589 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,6 +2,7 @@ package praefect
import (
"context"
+ "fmt"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
@@ -15,8 +16,20 @@ import (
"google.golang.org/grpc/status"
)
-func isDestructive(methodName string) bool {
- return methodName == "/gitaly.RepositoryService/RemoveRepository"
+// getReplicationDetails determines the type of job and additional details based on the method name and incoming message
+func getReplicationDetails(methodName string, m proto.Message) (datastore.ChangeType, datastore.Params, error) {
+ switch methodName {
+ case "/gitaly.RepositoryService/RemoveRepository":
+ return datastore.DeleteRepo, nil, nil
+ case "/gitaly.RepositoryService/RenameRepository":
+ req, ok := m.(*gitalypb.RenameRepositoryRequest)
+ if !ok {
+ return 0, nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
+ }
+ return datastore.RenameRepo, datastore.Params{"RelativePath": req.RelativePath}, nil
+ default:
+ return datastore.UpdateRepo, nil, nil
+ }
}
// Coordinator takes care of directing client requests to the appropriate
@@ -71,9 +84,9 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot
var requestFinalizer func()
if mi.Operation == protoregistry.OpMutator {
- change := datastore.UpdateRepo
- if isDestructive(fullMethodName) {
- change = datastore.DeleteRepo
+ change, params, err := getReplicationDetails(fullMethodName, m)
+ if err != nil {
+ return nil, err
}
secondaries, err := shard.GetSecondaries()
@@ -81,7 +94,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot
return nil, err
}
- if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change, params); err != nil {
return nil, err
}
}
@@ -169,12 +182,18 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
+func (c *Coordinator) createReplicaJobs(
+ targetRepo *gitalypb.Repository,
+ primary nodes.Node,
+ secondaries []nodes.Node,
+ change datastore.ChangeType,
+ params datastore.Params,
+) (func(), error) {
var secondaryStorages []string
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.GetStorage())
}
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change, params)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 5896e7797..5f950e19e 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -51,8 +51,13 @@ const (
UpdateRepo ChangeType = iota + 1
// DeleteRepo is when a replication deletes a repo
DeleteRepo
+ // RenameRepo is when a replication renames repo
+ RenameRepo
)
+// Params represent additional set of parameters required for replication job.
+type Params map[string]interface{}
+
// ReplJob is an instance of a queued replication job. A replication job is
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
@@ -63,6 +68,7 @@ type ReplJob struct {
RelativePath string // source for replication
State JobState
Attempts int
+ Params Params // additional information required to run the job
}
// replJobs provides sort manipulation behavior
@@ -108,7 +114,7 @@ type ReplJobsDatastore interface {
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath, primaryStorage string, secondaryStorages []string, change ChangeType, params Params) ([]uint64, error)
// UpdateReplJobState updates the state of an existing replication job
UpdateReplJobState(jobID uint64, newState JobState) error
@@ -117,11 +123,13 @@ type ReplJobsDatastore interface {
}
type jobRecord struct {
- change ChangeType
- relativePath string // project's relative path
- targetNodeStorage, sourceNodeStorage string
- state JobState
- attempts int
+ change ChangeType
+ relativePath string // project's relative path
+ targetNodeStorage string
+ sourceNodeStorage string
+ state JobState
+ attempts int
+ params Params
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -293,6 +301,7 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
State: record.state,
TargetNode: targetNode,
Attempts: record.attempts,
+ Params: record.params,
}, nil
}
@@ -302,7 +311,13 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(
+ relativePath,
+ primaryStorage string,
+ secondaryStorages []string,
+ change ChangeType,
+ params Params,
+) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -321,6 +336,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primarySto
state: JobStatePending,
relativePath: relativePath,
sourceNodeStorage: primaryStorage,
+ params: params,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 0ee189123..f4663bb66 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -42,43 +42,73 @@ var operations = []struct {
},
},
{
- desc: "insert replication job",
+ desc: "insert replication job for Update",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo, nil)
+ require.NoError(t, err)
+ },
+ },
+ {
+ desc: "insert replication job for Rename",
+ opFn: func(t *testing.T, ds Datastore) {
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, RenameRepo, Params{"RelativePath": "/data/dir/repo"})
require.NoError(t, err)
},
},
{
desc: "fetch inserted replication jobs",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.Storage, 10)
+ jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 10)
require.NoError(t, err)
- require.Len(t, jobs, 1)
+ require.Len(t, jobs, 2)
- expectedJob := ReplJob{
- Change: UpdateRepo,
- ID: 1,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
+ expectedJobs := []ReplJob{
+ {
+ Change: UpdateRepo,
+ ID: 1,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
+ Params: nil,
+ },
+ {
+ Change: RenameRepo,
+ ID: 2,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
+ Params: Params{"RelativePath": "/data/dir/repo"},
+ },
}
- require.Equal(t, expectedJob, jobs[0])
+ require.ElementsMatch(t, expectedJobs, jobs)
},
},
{
- desc: "mark replication job done",
+ desc: "mark Update replication job as done",
opFn: func(t *testing.T, ds Datastore) {
err := ds.UpdateReplJobState(1, JobStateComplete)
require.NoError(t, err)
},
},
{
- desc: "try fetching completed replication job",
+ desc: "try fetching pending replication jobs",
opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.Storage, 1)
+ jobs, err := ds.GetJobs(JobStatePending, stor2.Storage, 1)
require.NoError(t, err)
- require.Len(t, jobs, 0)
+ require.Len(t, jobs, 1)
+
+ completed := ReplJob{
+ Change: RenameRepo,
+ ID: 2,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
+ Params: Params{"RelativePath": "/data/dir/repo"},
+ }
+ require.Equal(t, completed, jobs[0])
},
},
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 49af0be3d..769d3a56c 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -2,6 +2,7 @@ package praefect
import (
"context"
+ "errors"
"fmt"
"time"
@@ -22,6 +23,8 @@ type Replicator interface {
Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
// Destroy will remove the target repo on the specified target connection
Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
+ // Rename will rename(move) the target repo on the specified target connection
+ Rename(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
}
type defaultReplicator struct {
@@ -109,6 +112,32 @@ func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob,
return err
}
+func (dr defaultReplicator) Rename(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
+ targetRepo := &gitalypb.Repository{
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.RelativePath,
+ }
+
+ repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+
+ val, found := job.Params["RelativePath"]
+ if !found {
+ return errors.New("no 'RelativePath' parameter for rename")
+ }
+
+ relativePath, ok := val.(string)
+ if !ok {
+ return fmt.Errorf("parameter 'RelativePath' has unexpected type: %T", relativePath)
+ }
+
+ _, err := repoSvcClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: targetRepo,
+ RelativePath: relativePath,
+ })
+
+ return err
+}
+
func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClient, repo *gitalypb.Repository, checksum *string) func() error {
return func() error {
primaryChecksumRes, err := client.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
@@ -283,7 +312,12 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
totalJobs += len(jobs)
- reposReplicated := make(map[string]struct{})
+ type replicatedKey struct {
+ change datastore.ChangeType
+ repoPath, source, target string
+ }
+ reposReplicated := make(map[replicatedKey]struct{})
+
for _, job := range jobs {
if job.Attempts >= maxAttempts {
if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
@@ -292,11 +326,26 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
continue
}
- if _, ok := reposReplicated[job.RelativePath]; ok {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ var replicationKey replicatedKey
+ switch job.Change {
+ // this optimization could be done only for Update and Delete replication jobs as we treat them as idempotent
+ // Update - there is no much profit from executing multiple fetches for the same target from the same source one by one
+ // Delete - there is no way how we could remove already removed repository
+ // that is why those Jobs needs to be tracked and marked as Cancelled (removed from queue without execution).
+ case datastore.UpdateRepo, datastore.DeleteRepo:
+ replicationKey = replicatedKey{
+ change: job.Change,
+ repoPath: job.RelativePath,
+ source: job.SourceNode.Storage,
+ target: job.TargetNode.Storage,
+ }
+
+ if _, ok := reposReplicated[replicationKey]; ok {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
}
- continue
}
if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
@@ -311,7 +360,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
continue
}
- reposReplicated[job.RelativePath] = struct{}{}
+ reposReplicated[replicationKey] = struct{}{}
}
}
} else {
@@ -377,6 +426,8 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC)
case datastore.DeleteRepo:
err = r.replicator.Destroy(injectedCtx, job, targetCC)
+ case datastore.RenameRepo:
+ err = r.replicator.Rename(injectedCtx, job, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %d", job.Change)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 24b10b051..72a1c43ea 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1,6 +1,7 @@
package praefect
import (
+ "context"
"io/ioutil"
"log"
"net"
@@ -13,6 +14,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -122,7 +124,7 @@ func TestProcessReplicationJob(t *testing.T) {
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.Storage)
}
- _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo, nil)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
@@ -268,7 +270,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
require.NoError(t, err)
require.Len(t, ids, 1)
@@ -367,13 +369,44 @@ func TestProcessBacklog_Success(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+
+ var jobIDs []uint64
+
+ // Update replication job
+ idsUpdate1, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
require.NoError(t, err)
- require.Len(t, ids, 1)
+ require.Len(t, idsUpdate1, 1)
+ jobIDs = append(jobIDs, idsUpdate1...)
+
+ // Update replication job
+ idsUpdate2, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
+ require.NoError(t, err)
+ require.Len(t, idsUpdate2, 1)
+ jobIDs = append(jobIDs, idsUpdate2...)
+
+ renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
+ fullNewPath1 := filepath.Join(backupDir, renameTo1)
+
+ renameTo2 := filepath.Join(renameTo1, "..", filepath.Base(testRepo.GetRelativePath())+"-mv2")
+ fullNewPath2 := filepath.Join(backupDir, renameTo2)
+
+ // Rename replication job
+ idsRename1, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo1})
+ require.NoError(t, err)
+ require.Len(t, idsRename1, 1)
+ jobIDs = append(jobIDs, idsRename1...)
+
+ // Rename replication job
+ idsRename2, err := ds.CreateReplicaReplJobs(renameTo1, primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo2})
+ require.NoError(t, err)
+ require.Len(t, idsRename2, 1)
+ jobIDs = append(jobIDs, idsRename2...)
entry := testhelper.DiscardTestEntry(t)
- require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
+ for _, id := range jobIDs {
+ require.NoError(t, ds.UpdateReplJobState(id, datastore.JobStateReady))
+ }
nodeMgr, err := nodes.NewManager(entry, config)
require.NoError(t, err)
@@ -381,7 +414,9 @@ func TestProcessBacklog_Success(t *testing.T) {
replMgr := NewReplMgr("default", entry, ds, nodeMgr)
replMgr.replJobTimeout = 5 * time.Second
- go replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+ go func() {
+ require.Equal(t, context.Canceled, replMgr.ProcessBacklog(ctx, noopBackoffFunc), "backlog processing failed")
+ }()
timeLimit := time.NewTimer(5 * time.Second)
ticker := time.NewTicker(1 * time.Second)
@@ -401,6 +436,10 @@ TestJobSucceeds:
t.Fatal("time limit expired for job to complete")
}
}
+
+ _, serr := os.Stat(fullNewPath1)
+ require.True(t, os.IsNotExist(serr), "repository must be moved from %q to the new location", fullNewPath1)
+ require.True(t, helper.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location")
}
func TestBackoff(t *testing.T) {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index b52d08fc3..b5d01664b 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -4,6 +4,7 @@ import (
"context"
"io/ioutil"
"os"
+ "path/filepath"
"strings"
"testing"
"time"
@@ -16,6 +17,7 @@ import (
gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -442,19 +444,126 @@ func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) {
require.Failf(t, "unable to detect path removal for %s", path)
default:
_, err := os.Stat(path)
- switch {
- case err != nil && os.IsNotExist(err):
+ if os.IsNotExist(err) {
return
- case err == nil:
- break
- default:
- require.Failf(t, "unexpected error while checking path %s", path)
}
+ require.NoError(t, err, "unexpected error while checking path %s", path)
}
time.Sleep(time.Millisecond)
}
}
+func TestRepoRename(t *testing.T) {
+ oldStorages := gconfig.Config.Storages
+ defer func() { gconfig.Config.Storages = oldStorages }()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*models.Node{
+ 0: {
+ DefaultPrimary: true,
+ Storage: gconfig.Config.Storages[0].Name,
+ Address: "tcp::/this-doesnt-matter",
+ },
+ 1: {
+ Storage: "gitaly-1",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ 2: {
+ Storage: "gitaly-2",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
+ },
+ },
+ }
+
+ virtualStorage := conf.VirtualStorages[0]
+ testStorages := []gconfig.Storage{
+ {
+ Name: virtualStorage.Nodes[1].Storage,
+ Path: tempStoragePath(t),
+ },
+ {
+ Name: virtualStorage.Nodes[2].Storage,
+ Path: tempStoragePath(t),
+ },
+ }
+
+ gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...)
+ defer func() {
+ for _, s := range testStorages {
+ require.NoError(t, os.RemoveAll(s.Path))
+ }
+ }()
+
+ require.Len(t, gconfig.Config.Storages, 3, "1 default storage and 2 replicas of it")
+
+ // repo0 is a template that is used to create replica set by cloning it into other storage (directories)
+ repo0, path0, cleanup0 := testhelper.NewTestRepo(t)
+ defer cleanup0()
+
+ _, path1, cleanup1 := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[1].Storage)
+ defer cleanup1()
+
+ _, path2, cleanup2 := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[2].Storage)
+ defer cleanup2()
+
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
+ virtualRepo := &(*repo0)
+ virtualRepo.StorageName = virtualStorage.Name
+
+ repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ newName, err := text.RandomHex(20)
+ require.NoError(t, err)
+
+ expNewPath0 := filepath.Join(gconfig.Config.Storages[0].Path, newName)
+ expNewPath1 := filepath.Join(gconfig.Config.Storages[1].Path, newName)
+ expNewPath2 := filepath.Join(gconfig.Config.Storages[2].Path, newName)
+
+ require.NoError(t, os.RemoveAll(expNewPath0), "target dir must not exist before renaming")
+ require.NoError(t, os.RemoveAll(expNewPath1), "target dir must not exist before renaming")
+ require.NoError(t, os.RemoveAll(expNewPath2), "target dir must not exist before renaming")
+
+ _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: virtualRepo,
+ RelativePath: newName,
+ })
+ require.NoError(t, err)
+
+ resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: virtualRepo,
+ })
+ require.NoError(t, err)
+ require.False(t, resp.GetExists(), "repo with old name must gone")
+
+ // as we renamed the repo we need to update RelativePath before we could check if it exists
+ renamedVirtualRepo := &(*virtualRepo)
+ renamedVirtualRepo.RelativePath = newName
+ resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: renamedVirtualRepo,
+ })
+ require.NoError(t, err)
+ require.True(t, resp.GetExists(), "repo with new name must exist")
+ require.DirExists(t, expNewPath0, "must be renamed on secondary from %q to %q", path0, expNewPath0)
+
+ // the renaming of the repo on the secondary servers is not deterministic
+ // since it relies on eventually consistent replication
+ pollUntilRemoved(t, path1, time.After(10*time.Second))
+ require.DirExists(t, expNewPath1, "must be renamed on secondary from %q to %q", path1, expNewPath1)
+ pollUntilRemoved(t, path2, time.After(10*time.Second))
+ require.DirExists(t, expNewPath2, "must be renamed on secondary from %q to %q", path2, expNewPath2)
+}
+
func tempStoragePath(t testing.TB) string {
p, err := ioutil.TempDir("", t.Name())
require.NoError(t, err)