Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-05-17 12:12:39 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-05-17 12:12:39 +0300
commit93762b621c011fe570339c1c247d5197c2cfefcc (patch)
treecfac354d8a6a34eb8dbf4c6615dea9b8140bc244
parent6b31501b13eae70aea5061edc8273c551ba4c349 (diff)
parent9c19ad66f4b691f47bf5fa900062b45bec36cd08 (diff)
Merge branch 'smh-repository-id-rename-repository' into 'master'
Handle repository creations, deletions and renames atomically Closes #4003 and #3485 See merge request gitlab-org/gitaly!4101
-rw-r--r--cmd/praefect/subcmd_remove_repository.go154
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go107
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go15
-rw-r--r--internal/git/gittest/repo.go23
-rw-r--r--internal/git/housekeeping/object_pool.go4
-rw-r--r--internal/git/housekeeping/object_pool_test.go10
-rw-r--r--internal/gitaly/service/repository/rename_test.go30
-rw-r--r--internal/metadata/featureflag/ff_praefect_generated_paths.go4
-rw-r--r--internal/praefect/coordinator.go6
-rw-r--r--internal/praefect/coordinator_test.go1
-rw-r--r--internal/praefect/datastore/repository_store.go36
-rw-r--r--internal/praefect/datastore/repository_store_mock.go6
-rw-r--r--internal/praefect/datastore/repository_store_test.go44
-rw-r--r--internal/praefect/praefectutil/replica_path.go27
-rw-r--r--internal/praefect/praefectutil/replica_path_test.go36
-rw-r--r--internal/praefect/rename_repository.go160
-rw-r--r--internal/praefect/router_per_repository.go15
-rw-r--r--internal/praefect/router_per_repository_test.go29
-rw-r--r--internal/praefect/server.go7
-rw-r--r--internal/praefect/server_test.go126
-rw-r--r--internal/praefect/verifier_test.go6
-rw-r--r--internal/testhelper/testhelper.go3
22 files changed, 523 insertions, 326 deletions
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
index 49b049615..5298c713d 100644
--- a/cmd/praefect/subcmd_remove_repository.go
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -7,7 +7,6 @@ import (
"flag"
"fmt"
"io"
- "strings"
"sync"
"time"
@@ -19,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
)
const (
@@ -61,15 +59,11 @@ func (cmd *removeRepository) FlagSet() *flag.FlagSet {
" This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
" This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
" database state as tracked by Praefect.\n" +
- " Runs in dry-run mode by default and lists the gitaly nodes from which the repository will be removed from " +
+ " Runs in dry-run mode by default checks whether the repository exists" +
" without actually removing it from the database and disk.\n" +
" When -apply is used, the repository will be removed from the database as well as\n" +
" the individual gitaly nodes on which they exist.\n")
fs.PrintDefaults()
- printfErr("NOTE:\n" +
- " It may happen that parts of the repository continue to exist after this command, either because\n" +
- " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" +
- " It is safe and recommended to re-run this command in such a case.\n")
}
return fs
}
@@ -116,21 +110,10 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger
if exists {
fmt.Fprintln(cmd.w, "Repository found in the database.")
} else {
- fmt.Fprintln(cmd.w, "Repository is not being tracked in Praefect.")
+ return errors.New("repository is not being tracked in Praefect")
}
}
- storagesOnDisk, err := cmd.getStoragesFromNodes(ctx, cfg)
- if err != nil {
- return err
- }
-
- if len(storagesOnDisk) == 0 {
- fmt.Fprintln(cmd.w, "Repository not found on any gitaly nodes.")
- } else {
- fmt.Fprintf(cmd.w, "Repository found on the following gitaly nodes: %s.\n", strings.Join(storagesOnDisk, ", "))
- }
-
if !cmd.apply {
fmt.Fprintln(cmd.w, "Re-run the command with -apply to remove repositories from the database and disk.")
return nil
@@ -138,18 +121,21 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger
fmt.Fprintf(cmd.w, "Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", cmd.relativePath)
- fmt.Fprintln(cmd.w, "Removing repository from the database...")
- removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
+ addr, err := getNodeAddress(cfg)
if err != nil {
- return fmt.Errorf("remove repository info from praefect database: %w", err)
+ return fmt.Errorf("get node address: %w", err)
}
- if !removed {
- fmt.Fprintln(cmd.w, "The database has no information about this repository.")
- } else {
- fmt.Fprintln(cmd.w, "Removed repository metadata from the database.")
+ _, err = cmd.removeRepository(ctx, &gitalypb.Repository{
+ StorageName: cmd.virtualStorage,
+ RelativePath: cmd.relativePath,
+ }, addr, cfg.Auth.Token)
+ if err != nil {
+ return fmt.Errorf("repository removal failed: %w", err)
}
+ fmt.Fprintln(cmd.w, "Repository removal completed.")
+
fmt.Fprintln(cmd.w, "Removing replication events...")
ticker := helper.NewTimerTicker(time.Second)
defer ticker.Stop()
@@ -157,80 +143,9 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger
return fmt.Errorf("remove scheduled replication events: %w", err)
}
fmt.Fprintln(cmd.w, "Replication event removal completed.")
-
- // We should try to remove repository from each of gitaly nodes.
- fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...")
- cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
- fmt.Fprintln(cmd.w, "Finished removing repository from gitaly nodes.")
-
return nil
}
-// getStoragesFromNodes looks on disk to finds the storages this repository exists for
-func (cmd *removeRepository) getStoragesFromNodes(ctx context.Context, cfg config.Config) ([]string, error) {
- var storages []string
- for _, vs := range cfg.VirtualStorages {
- if vs.Name != cmd.virtualStorage {
- continue
- }
-
- storages = make([]string, len(vs.Nodes))
- var wg sync.WaitGroup
-
- for i := 0; i < len(vs.Nodes); i++ {
- wg.Add(1)
- go func(node *config.Node, i int) {
- defer wg.Done()
- repo := &gitalypb.Repository{
- StorageName: node.Storage,
- RelativePath: cmd.relativePath,
- }
- exists, err := repositoryExists(ctx, repo, node.Address, node.Token)
- if err != nil {
- cmd.logger.WithError(err).Errorf("checking if repository exists on %q", node.Storage)
- }
- if exists {
- storages[i] = node.Storage
- }
- }(vs.Nodes[i], i)
- }
-
- wg.Wait()
- break
- }
-
- var storagesFound []string
- for _, storage := range storages {
- if storage != "" {
- storagesFound = append(storagesFound, storage)
- }
- }
-
- return storagesFound, nil
-}
-
-func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
- var removed bool
- if err := db.QueryRowContext(
- ctx,
- `WITH remove_storages_info AS (
- DELETE FROM storage_repositories
- WHERE virtual_storage = $1 AND relative_path = $2
- )
- DELETE FROM repositories
- WHERE virtual_storage = $1 AND relative_path = $2
- RETURNING TRUE`,
- cmd.virtualStorage,
- cmd.relativePath,
- ).Scan(&removed); err != nil {
- if errors.Is(err, sql.ErrNoRows) {
- return false, nil
- }
- return false, fmt.Errorf("query row: %w", err)
- }
- return removed, nil
-}
-
func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
conn, err := subCmdDial(ctx, addr, token, cmd.dialTimeout)
if err != nil {
@@ -241,10 +156,7 @@ func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalyp
ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName)
repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil {
- if _, ok := status.FromError(err); !ok {
- return false, fmt.Errorf("RemoveRepository: %w", err)
- }
- return false, nil
+ return false, err
}
return true, nil
}
@@ -291,43 +203,3 @@ func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger
}
return nil
}
-
-func (cmd *removeRepository) removeNode(
- ctx context.Context,
- logger logrus.FieldLogger,
- node *config.Node,
-) {
- logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
- repo := &gitalypb.Repository{
- StorageName: node.Storage,
- RelativePath: cmd.relativePath,
- }
- removed, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
- if err != nil {
- logger.WithError(err).Warnf("repository removal failed for %q", node.Storage)
- } else {
- if removed {
- fmt.Fprintf(cmd.w, "Successfully removed %s from %s\n", cmd.relativePath, node.Storage)
- } else {
- fmt.Fprintf(cmd.w, "Did not remove %s from %s\n", cmd.relativePath, node.Storage)
- }
- }
- logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
-}
-
-func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
- for _, vs := range cfg.VirtualStorages {
- if vs.Name == cmd.virtualStorage {
- var wg sync.WaitGroup
- for i := 0; i < len(vs.Nodes); i++ {
- wg.Add(1)
- go func(i int) {
- defer wg.Done()
- cmd.removeNode(ctx, logger, vs.Nodes[i])
- }(i)
- }
- wg.Wait()
- break
- }
- }
-}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index 040dd40e5..1831d131b 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -6,15 +6,15 @@ import (
"fmt"
"os"
"path/filepath"
- "strings"
"testing"
"time"
"github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gitalycfg "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
@@ -104,9 +104,19 @@ func TestRemoveRepository_Exec(t *testing.T) {
praefectStorage := conf.VirtualStorages[0].Name
+ repositoryExists := func(t testing.TB, repo *gitalypb.Repository) bool {
+ response, err := gitalypb.NewRepositoryServiceClient(cc).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: repo,
+ })
+ require.NoError(t, err)
+ return response.GetExists()
+ }
+
t.Run("dry run", func(t *testing.T) {
var out bytes.Buffer
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo)
+
cmd := &removeRepository{
logger: testhelper.NewDiscardingLogger(t),
virtualStorage: repo.StorageName,
@@ -117,15 +127,11 @@ func TestRemoveRepository_Exec(t *testing.T) {
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
- require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
- require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath))
assert.Contains(t, out.String(), "Repository found in the database.\n")
- assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n")
-
- repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath)
- require.NoError(t, err)
- require.True(t, repositoryRowExists)
+ require.True(t, repositoryExists(t, repo))
})
t.Run("ok", func(t *testing.T) {
@@ -144,17 +150,9 @@ func TestRemoveRepository_Exec(t *testing.T) {
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
assert.Contains(t, out.String(), "Repository found in the database.\n")
- assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath))
- assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from", repo.RelativePath))
- assert.NotContains(t, out.String(), fmt.Sprintf("Did not remove %s from", repo.RelativePath))
-
- var repositoryRowExists bool
- require.NoError(t, db.QueryRow(
- `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
- cmd.virtualStorage, cmd.relativePath,
- ).Scan(&repositoryRowExists))
- require.False(t, repositoryRowExists)
+ assert.Contains(t, out.String(), "Repository removal completed.")
+ require.False(t, repositoryExists(t, repo))
})
t.Run("repository doesnt exist on one gitaly", func(t *testing.T) {
@@ -176,23 +174,18 @@ func TestRemoveRepository_Exec(t *testing.T) {
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
assert.Contains(t, out.String(), "Repository found in the database.\n")
- assert.Contains(t, out.String(), "Repository found on the following gitaly nodes: gitaly-1")
assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath))
- assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from gitaly-1", repo.RelativePath))
-
- var repositoryRowExists bool
- require.NoError(t, db.QueryRow(
- `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
- cmd.virtualStorage, cmd.relativePath,
- ).Scan(&repositoryRowExists))
- require.False(t, repositoryRowExists)
+ assert.Contains(t, out.String(), "Repository removal completed.")
+ require.False(t, repositoryExists(t, repo))
})
t.Run("no info about repository on praefect", func(t *testing.T) {
var out bytes.Buffer
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo)
+
repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
- _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath)
+ _, _, err = repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath)
require.NoError(t, err)
cmd := &removeRepository{
@@ -203,14 +196,10 @@ func TestRemoveRepository_Exec(t *testing.T) {
apply: true,
w: &writer{w: &out},
}
- require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
- assert.Contains(t, out.String(), "Repository is not being tracked in Praefect.\n")
- assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
- assert.Contains(t, out.String(), "The database has no information about this repository.\n")
- require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
- require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
-
- requireNoDatabaseInfo(t, db, cmd)
+ require.Error(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), "repository is not being tracked in Praefect")
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath))
+ require.False(t, repositoryExists(t, repo))
})
t.Run("one of gitalies is out of service", func(t *testing.T) {
@@ -218,11 +207,12 @@ func TestRemoveRepository_Exec(t *testing.T) {
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
g2Srv.Shutdown()
- logger := testhelper.NewDiscardingLogger(t)
- loggerHook := test.NewLocal(logger)
+ replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo)
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath))
cmd := &removeRepository{
- logger: logrus.NewEntry(logger),
+ logger: logrus.NewEntry(testhelper.NewDiscardingLogger(t)),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
dialTimeout: 100 * time.Millisecond,
@@ -231,43 +221,14 @@ func TestRemoveRepository_Exec(t *testing.T) {
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ assert.Contains(t, out.String(), "Repository removal completed.")
- var checkExistsOnNodeErrFound, removeRepoFromDiskErrFound bool
- for _, entry := range loggerHook.AllEntries() {
- if strings.Contains(entry.Message, `checking if repository exists on "gitaly-2"`) {
- checkExistsOnNodeErrFound = true
- }
-
- if strings.Contains(entry.Message, `repository removal failed for "gitaly-2"`) {
- removeRepoFromDiskErrFound = true
- }
- }
- require.True(t, checkExistsOnNodeErrFound)
- require.True(t, removeRepoFromDiskErrFound)
-
- require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
- require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
-
- requireNoDatabaseInfo(t, db, cmd)
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath))
+ require.False(t, repositoryExists(t, repo))
})
}
-func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) {
- t.Helper()
- var repositoryRowExists bool
- require.NoError(t, db.QueryRow(
- `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
- cmd.virtualStorage, cmd.relativePath,
- ).Scan(&repositoryRowExists))
- require.False(t, repositoryRowExists)
- var storageRowExists bool
- require.NoError(t, db.QueryRow(
- `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
- cmd.virtualStorage, cmd.relativePath,
- ).Scan(&storageRowExists))
- require.False(t, storageRowExists)
-}
-
func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
t.Parallel()
const (
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 2e3c807bf..3e8750f42 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -3,7 +3,6 @@ package main
import (
"bytes"
"flag"
- "io"
"path/filepath"
"testing"
"time"
@@ -170,15 +169,9 @@ func TestAddRepository_Exec(t *testing.T) {
defer nodeMgr.Stop()
repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
-
- rmRepoCmd := &removeRepository{
- logger: logger,
- virtualStorage: virtualStorageName,
- relativePath: tc.relativePath,
- w: io.Discard,
- apply: true,
- }
- require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath)
+ require.NoError(t, err)
+ require.False(t, exists)
// create the repo on Gitaly without Praefect knowing
require.NoError(t, createRepoThroughGitaly1(tc.relativePath))
@@ -207,7 +200,7 @@ func TestAddRepository_Exec(t *testing.T) {
assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
- exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath)
+ exists, err = repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath)
require.NoError(t, err)
assert.True(t, exists)
assert.Contains(t, stdout.String(), tc.expectedOutput)
diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go
index 020ae8271..88c8f93d4 100644
--- a/internal/git/gittest/repo.go
+++ b/internal/git/gittest/repo.go
@@ -178,13 +178,30 @@ func CreateRepository(ctx context.Context, t testing.TB, cfg config.Cfg, configs
return clonedRepo, filepath.Join(storage.Path, getReplicaPath(ctx, t, conn, repository))
}
+// GetReplicaPathConfig allows for configuring the GetReplicaPath call.
+type GetReplicaPathConfig struct {
+ // ClientConn is the connection used to create the repository. If unset, the config is used to
+ // dial the service.
+ ClientConn *grpc.ClientConn
+}
+
// GetReplicaPath retrieves the repository's replica path if the test has been
// run with Praefect in front of it. This is necessary if the test creates a repository
// through Praefect and peeks into the filesystem afterwards. Conn should be pointing to
// Praefect.
-func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo) string {
- conn := dialService(ctx, t, cfg)
- defer conn.Close()
+func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo, opts ...GetReplicaPathConfig) string {
+ require.Less(t, len(opts), 2, "you must either pass no or exactly one option")
+
+ var opt GetReplicaPathConfig
+ if len(opts) > 0 {
+ opt = opts[0]
+ }
+
+ conn := opt.ClientConn
+ if conn == nil {
+ conn = dialService(ctx, t, cfg)
+ defer conn.Close()
+ }
return getReplicaPath(ctx, t, conn, repo)
}
diff --git a/internal/git/housekeeping/object_pool.go b/internal/git/housekeeping/object_pool.go
index 1e4a935c8..4d0a27e4b 100644
--- a/internal/git/housekeeping/object_pool.go
+++ b/internal/git/housekeeping/object_pool.go
@@ -3,6 +3,8 @@ package housekeeping
import (
"regexp"
"strings"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil"
)
// railsPoolDirRegexp is used to validate object pool directory structure and name as generated by Rails.
@@ -21,5 +23,5 @@ func IsRailsPoolPath(relativePath string) bool {
// IsPoolPath returns whether the relative path indicates the repository is an object
// pool.
func IsPoolPath(relativePath string) bool {
- return IsRailsPoolPath(relativePath)
+ return IsRailsPoolPath(relativePath) || praefectutil.IsPoolPath(relativePath)
}
diff --git a/internal/git/housekeeping/object_pool_test.go b/internal/git/housekeeping/object_pool_test.go
index 6fdded51f..50c3201cd 100644
--- a/internal/git/housekeeping/object_pool_test.go
+++ b/internal/git/housekeeping/object_pool_test.go
@@ -5,6 +5,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil"
)
func TestIsPoolPath(t *testing.T) {
@@ -19,6 +20,15 @@ func TestIsPoolPath(t *testing.T) {
isPoolPath: true,
},
{
+ desc: "praefect pool path",
+ relativePath: praefectutil.DerivePoolPath(1),
+ isPoolPath: true,
+ },
+ {
+ desc: "praefect replica path",
+ relativePath: praefectutil.DeriveReplicaPath(1),
+ },
+ {
desc: "empty string",
},
{
diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go
index 9983eb894..f28657fb8 100644
--- a/internal/gitaly/service/repository/rename_test.go
+++ b/internal/gitaly/service/repository/rename_test.go
@@ -1,6 +1,7 @@
package repository
import (
+ "context"
"fmt"
"os"
"path/filepath"
@@ -11,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -19,8 +21,11 @@ import (
)
func TestRenameRepository_success(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositorySuccess)
+}
+
+func testRenameRepositorySuccess(t *testing.T, ctx context.Context) {
t.Parallel()
- ctx := testhelper.Context(t)
// Praefect does not move repositories on the disk so this test case is not run with Praefect.
cfg, repo, _, client := setupRepositoryService(ctx, t, testserver.WithDisablePraefect())
@@ -43,8 +48,11 @@ func TestRenameRepository_success(t *testing.T) {
}
func TestRenameRepository_DestinationExists(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryDestinationExists)
+}
+
+func testRenameRepositoryDestinationExists(t *testing.T, ctx context.Context) {
t.Parallel()
- ctx := testhelper.Context(t)
cfg, client := setupRepositoryServiceWithoutRepo(t)
@@ -70,11 +78,11 @@ func TestRenameRepository_DestinationExists(t *testing.T) {
}
func TestRenameRepository_invalidRequest(t *testing.T) {
- // Prafect applies renames to metadata even on failed requests, which fails this test.
- testhelper.SkipWithPraefect(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/4003")
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryInvalidRequest)
+}
+func testRenameRepositoryInvalidRequest(t *testing.T, ctx context.Context) {
t.Parallel()
- ctx := testhelper.Context(t)
_, repo, repoPath, client := setupRepositoryService(ctx, t)
storagePath := strings.TrimSuffix(repoPath, "/"+repo.RelativePath)
@@ -87,7 +95,7 @@ func TestRenameRepository_invalidRequest(t *testing.T) {
{
desc: "empty repository",
req: &gitalypb.RenameRepositoryRequest{Repository: nil, RelativePath: "/tmp/abc"},
- exp: status.Error(codes.InvalidArgument, gitalyOrPraefect("empty Repository", "repo scoped: empty Repository")),
+ exp: status.Error(codes.InvalidArgument, "empty Repository"),
},
{
desc: "empty destination relative path",
@@ -101,20 +109,20 @@ func TestRenameRepository_invalidRequest(t *testing.T) {
},
{
desc: "repository storage doesn't exist",
- req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "../usr/bin"},
- exp: status.Error(codes.InvalidArgument, gitalyOrPraefect(`GetStorageByName: no such storage: "stub"`, "repo scoped: invalid Repository")),
+ req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "usr/bin"},
+ exp: status.Error(codes.InvalidArgument, `GetStorageByName: no such storage: "stub"`),
},
{
desc: "repository relative path doesn't exist",
- req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "../usr/bin"},
- exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, storagePath)),
+ req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "non-existent/directory"},
+ exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, gitalyOrPraefect(storagePath, repo.GetStorageName()))),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, err := client.RenameRepository(ctx, tc.req)
- testhelper.RequireGrpcError(t, err, tc.exp)
+ testhelper.RequireGrpcError(t, tc.exp, err)
})
}
}
diff --git a/internal/metadata/featureflag/ff_praefect_generated_paths.go b/internal/metadata/featureflag/ff_praefect_generated_paths.go
new file mode 100644
index 000000000..4109bd636
--- /dev/null
+++ b/internal/metadata/featureflag/ff_praefect_generated_paths.go
@@ -0,0 +1,4 @@
+package featureflag
+
+// PraefectGeneratedReplicaPaths will enable Praefect generated replica paths for new repositories.
+var PraefectGeneratedReplicaPaths = NewFeatureFlag("praefect_generated_replica_paths", false)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 77e83ec5a..3821db602 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -488,6 +488,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
route.RepositoryID,
virtualStorage,
targetRepo,
+ route.ReplicaPath,
route.Primary.Storage,
nil,
append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...),
@@ -835,7 +836,7 @@ func (c *Coordinator) createTransactionFinalizer(
}
return c.newRequestFinalizer(
- ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage,
+ ctx, route.RepositoryID, virtualStorage, targetRepo, route.ReplicaPath, route.Primary.Storage,
updated, outdated, change, params, cause)()
}
}
@@ -992,6 +993,7 @@ func (c *Coordinator) newRequestFinalizer(
repositoryID int64,
virtualStorage string,
targetRepo *gitalypb.Repository,
+ replicaPath string,
primary string,
updatedSecondaries []string,
outdatedSecondaries []string,
@@ -1048,7 +1050,7 @@ func (c *Coordinator) newRequestFinalizer(
repositoryID,
virtualStorage,
targetRepo.GetRelativePath(),
- targetRepo.GetRelativePath(),
+ replicaPath,
primary,
updatedSecondaries,
outdatedSecondaries,
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index fce4d5e5b..e0927ae69 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -2673,6 +2673,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
0,
"virtual storage",
&gitalypb.Repository{},
+ "replica-path",
"primary",
[]string{},
[]string{"secondary"},
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 780073155..35a8a6a68 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -118,6 +118,9 @@ type RepositoryStore interface {
// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
// which has no record in the virtual storage or the storage.
RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
+ // RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace
+ // RenameRepository which can be removed in a later release.
+ RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error
// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
ConsistentStoragesGetter
@@ -535,6 +538,39 @@ AND storage = $2
return nil
}
+// RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace
+// RenameRepository which can be removed in a later release.
+func (rs *PostgresRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error {
+ result, err := rs.db.ExecContext(ctx, `
+WITH repository AS (
+ UPDATE repositories
+ SET relative_path = $3
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ RETURNING repository_id
+)
+
+UPDATE storage_repositories
+SET relative_path = $3
+WHERE repository_id = (SELECT repository_id FROM repository)
+ `, virtualStorage, relativePath, newRelativePath)
+ if err != nil {
+ if glsql.IsUniqueViolation(err, "repository_lookup_index") {
+ return commonerr.ErrRepositoryAlreadyExists
+ }
+
+ return fmt.Errorf("query: %w", err)
+ }
+
+ if rowsAffected, err := result.RowsAffected(); err != nil {
+ return fmt.Errorf("rows affected: %w", err)
+ } else if rowsAffected == 0 {
+ return commonerr.ErrRepositoryNotFound
+ }
+
+ return nil
+}
+
//nolint: revive,stylecheck // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
const q = `
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 2a021661d..42acafb6e 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -14,6 +14,7 @@ type MockRepositoryStore struct {
SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error
+ RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
@@ -99,6 +100,11 @@ func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int
return m.DeleteReplicaFunc(ctx, repositoryID, storage)
}
+// RenameRepositoryInPlace runs the mock's RenameRepositoryInPlaceFunc.
+func (m MockRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error {
+ return m.RenameRepositoryInPlaceFunc(ctx, virtualStorage, relativePath, newRelativePath)
+}
+
//nolint: revive,stylecheck // This is unintentionally missing documentation.
func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
if m.RenameRepositoryFunc == nil {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index 9c980376c..afdfcdb14 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -765,6 +765,50 @@ func TestRepositoryStore_Postgres(t *testing.T) {
})
})
+ t.Run("RenameRepositoryInPlace", func(t *testing.T) {
+ t.Run("rename non-existing", func(t *testing.T) {
+ rs := newRepositoryStore(t, nil)
+
+ require.Equal(t,
+ commonerr.ErrRepositoryNotFound,
+ rs.RenameRepositoryInPlace(ctx, vs, repo, "new-relative-path"),
+ )
+ })
+
+ t.Run("destination exists", func(t *testing.T) {
+ rs := newRepositoryStore(t, nil)
+
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, "relative-path-1", "replica-path-1", "primary", nil, nil, true, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, vs, "relative-path-2", "replica-path-2", "primary", nil, nil, true, false))
+
+ require.Equal(t,
+ commonerr.ErrRepositoryAlreadyExists,
+ rs.RenameRepositoryInPlace(ctx, vs, "relative-path-1", "relative-path-2"),
+ )
+ })
+
+ t.Run("successfully renamed", func(t *testing.T) {
+ rs := newRepositoryStore(t, nil)
+
+ require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-relative-path", "original-replica-path", "primary", nil, nil, false, false))
+ require.NoError(t, rs.RenameRepositoryInPlace(ctx, vs, "original-relative-path", "renamed-relative-path"))
+ requireState(t, ctx, db,
+ virtualStorageState{
+ vs: {
+ "renamed-relative-path": {repositoryID: 1, replicaPath: "original-replica-path"},
+ },
+ },
+ storageState{
+ vs: {
+ "renamed-relative-path": {
+ "primary": {repositoryID: 1},
+ },
+ },
+ },
+ )
+ })
+ })
+
t.Run("RenameRepository", func(t *testing.T) {
t.Run("rename non-existing", func(t *testing.T) {
rs := newRepositoryStore(t, nil)
diff --git a/internal/praefect/praefectutil/replica_path.go b/internal/praefect/praefectutil/replica_path.go
index 5f35e1e81..576762d22 100644
--- a/internal/praefect/praefectutil/replica_path.go
+++ b/internal/praefect/praefectutil/replica_path.go
@@ -3,18 +3,41 @@ package praefectutil
import (
"crypto/sha256"
"fmt"
+ "path/filepath"
"strconv"
+ "strings"
)
+// poolPathPrefix is the prefix directory where Praefect places object pools.
+const poolPathPrefix = "@cluster/pools/"
+
+// IsPoolPath returns whether the relative path indicates this is a Praefect generated object pool path.
+func IsPoolPath(relativePath string) bool {
+ return strings.HasPrefix(relativePath, poolPathPrefix)
+}
+
// DeriveReplicaPath derives a repository's disk storage path from its repository ID. The repository ID
// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to
-// ensure even distribution into subdirectories. The format is @repositories/ab/cd/<repository-id>.
+// ensure even distribution into subdirectories. The format is @cluster/repositories/ab/cd/<repository-id>.
func DeriveReplicaPath(repositoryID int64) string {
+ return deriveDiskPath("@cluster/repositories", repositoryID)
+}
+
+// DerivePoolPath derives an object pools's disk storage path from its repository ID. The repository ID
+// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to
+// ensure even distribution into subdirectories. The format is @cluster/pools/ab/cd/<repository-id>. The pools
+// have a different directory prefix from other repositories so Gitaly can identify them in OptimizeRepository
+// and avoid pruning them.
+func DerivePoolPath(repositoryID int64) string {
+ return deriveDiskPath(poolPathPrefix, repositoryID)
+}
+
+func deriveDiskPath(prefixDir string, repositoryID int64) string {
hasher := sha256.New()
// String representation of the ID is used to make it easier to derive the replica paths with
// external tools. The error is ignored as the hash.Hash interface is documented to never return
// an error.
hasher.Write([]byte(strconv.FormatInt(repositoryID, 10)))
hash := hasher.Sum(nil)
- return fmt.Sprintf("@repositories/%x/%x/%d", hash[0:1], hash[1:2], repositoryID)
+ return filepath.Join(prefixDir, fmt.Sprintf("%x/%x/%d", hash[0:1], hash[1:2], repositoryID))
}
diff --git a/internal/praefect/praefectutil/replica_path_test.go b/internal/praefect/praefectutil/replica_path_test.go
index 6084c13ae..572dfc2ed 100644
--- a/internal/praefect/praefectutil/replica_path_test.go
+++ b/internal/praefect/praefectutil/replica_path_test.go
@@ -4,9 +4,41 @@ import (
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
)
func TestDeriveReplicaPath(t *testing.T) {
- require.Equal(t, "@repositories/6b/86/1", DeriveReplicaPath(1))
- require.Equal(t, "@repositories/d4/73/2", DeriveReplicaPath(2))
+ require.Equal(t, "@cluster/repositories/6b/86/1", DeriveReplicaPath(1))
+ require.Equal(t, "@cluster/repositories/d4/73/2", DeriveReplicaPath(2))
+}
+
+func TestDerivePoolPath(t *testing.T) {
+ require.Equal(t, "@cluster/pools/6b/86/1", DerivePoolPath(1))
+ require.Equal(t, "@cluster/pools/d4/73/2", DerivePoolPath(2))
+}
+
+func TestIsPoolPath(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ relativePath string
+ isPoolPath bool
+ }{
+ {
+ desc: "praefect pool path",
+ relativePath: DerivePoolPath(1),
+ isPoolPath: true,
+ },
+ {
+ desc: "praefect replica path",
+ relativePath: DeriveReplicaPath(1),
+ },
+ {
+ desc: "rails pool path",
+ relativePath: gittest.NewObjectPoolName(t),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ require.Equal(t, tc.isPoolPath, IsPoolPath(tc.relativePath))
+ })
+ }
}
diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go
new file mode 100644
index 000000000..495b18e4b
--- /dev/null
+++ b/internal/praefect/rename_repository.go
@@ -0,0 +1,160 @@
+package praefect
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type renamePeeker struct {
+ grpc.ServerStream
+ peeked *gitalypb.RenameRepositoryRequest
+}
+
+func (peeker *renamePeeker) RecvMsg(msg interface{}) error {
+ // On the first read, we'll return the peeked first message of the stream.
+ if peeker.peeked != nil {
+ peeked := peeker.peeked
+ peeker.peeked = nil
+
+ codec := proxy.NewCodec()
+ payload, err := codec.Marshal(peeked)
+ if err != nil {
+ return fmt.Errorf("marshaling peeked rename request: %w", err)
+ }
+
+ return codec.Unmarshal(payload, msg)
+ }
+
+ return peeker.ServerStream.RecvMsg(msg)
+}
+
+func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error {
+ // These checks are not strictly necessary but they exist to keep retain compatibility with
+ // Gitaly's tested behavior.
+ if req.GetRepository() == nil {
+ return helper.ErrInvalidArgumentf("empty Repository")
+ } else if req.GetRelativePath() == "" {
+ return helper.ErrInvalidArgumentf("destination relative path is empty")
+ } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok {
+ return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName())
+ } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil {
+ // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function
+ // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we
+ // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by
+ // seeing whether the relative path escapes the root directory. It's not possible to traverse up
+ // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work,
+ // we use the /fake-root directory simply to notice if there were traversals in the path.
+ return helper.ErrInvalidArgumentf("GetRepoPath: %s", err)
+ }
+
+ return nil
+}
+
+// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should
+// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the
+// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either.
+// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect
+// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys.
+// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled
+// later.
+//
+// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires
+// peeking into the internals of the proxying so we can set restore the frame correctly.
+func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor {
+ virtualStorages := make(map[string]struct{}, len(virtualStorageNames))
+ for _, virtualStorage := range virtualStorageNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" {
+ return handler(srv, stream)
+ }
+
+ // Peek the message
+ var request gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&request); err != nil {
+ return fmt.Errorf("peek rename repository request: %w", err)
+ }
+
+ // In order for the handlers to work after the message is peeked, the stream is restored
+ // with an alternative implementation that returns the first message correctly.
+ stream = &renamePeeker{ServerStream: stream, peeked: &request}
+
+ if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil {
+ return err
+ }
+
+ repo := request.GetRepository()
+ repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath())
+ if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath())
+ }
+
+ return fmt.Errorf("get repository id: %w", err)
+ }
+
+ replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID)
+ if err != nil {
+ return fmt.Errorf("get replica path: %w", err)
+ }
+
+ // Repositories that do not have a Praefect generated replica path are always handled in the old manner.
+ // Once the feature flag is removed, all of the repositories will be handled in the atomic manner.
+ if !strings.HasPrefix(replicaPath, "@cluster") {
+ return handler(srv, stream)
+ }
+
+ return handleRenameRepository(srv, stream)
+ }
+}
+
+// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming
+// the repository in the lookup table stored in the database.
+func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler {
+ virtualStorages := make(map[string]struct{}, len(virtualStoragesNames))
+ for _, virtualStorage := range virtualStoragesNames {
+ virtualStorages[virtualStorage] = struct{}{}
+ }
+
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ var req gitalypb.RenameRepositoryRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil {
+ return err
+ }
+
+ if err := rs.RenameRepositoryInPlace(stream.Context(),
+ req.GetRepository().GetStorageName(),
+ req.GetRepository().GetRelativePath(),
+ req.GetRelativePath(),
+ ); err != nil {
+ if errors.Is(err, commonerr.ErrRepositoryNotFound) {
+ return helper.ErrNotFoundf(
+ `GetRepoPath: not a git repository: "%s/%s"`,
+ req.GetRepository().GetStorageName(),
+ req.GetRepository().GetRelativePath(),
+ )
+ } else if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
+ return helper.ErrAlreadyExistsf("target repo exists already")
+ }
+
+ return helper.ErrInternal(err)
+ }
+
+ return stream.SendMsg(&gitalypb.RenameRepositoryResponse{})
+ }
+}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 6f1b4e7bd..7067cb36e 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -5,8 +5,11 @@ import (
"errors"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil"
"google.golang.org/grpc"
)
@@ -307,11 +310,19 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err)
}
+ replicaPath := relativePath
+ if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) {
+ replicaPath = praefectutil.DeriveReplicaPath(id)
+ if housekeeping.IsRailsPoolPath(relativePath) {
+ replicaPath = praefectutil.DerivePoolPath(id)
+ }
+ }
+
replicationFactor := r.defaultReplicationFactors[virtualStorage]
if replicationFactor == 1 {
return RepositoryMutatorRoute{
RepositoryID: id,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: primary,
}, nil
}
@@ -360,7 +371,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
return RepositoryMutatorRoute{
RepositoryID: id,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
AdditionalReplicaPath: additionalReplicaPath,
Primary: primary,
Secondaries: secondaries,
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index 44d8b10ce..6e620ffe2 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -8,9 +8,11 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
"google.golang.org/grpc"
@@ -662,6 +664,10 @@ func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) {
}
func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testPerRepositoryRouterRouteRepositoryCreation)
+}
+
+func testPerRepositoryRouterRouteRepositoryCreation(t *testing.T, ctx context.Context) {
t.Parallel()
configuredNodes := map[string][]string{
@@ -692,6 +698,11 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
additionalReplicaPath = "additional-replica-path"
)
+ replicaPath := relativePath
+ if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) {
+ replicaPath = praefectutil.DeriveReplicaPath(1)
+ }
+
for _, tc := range []struct {
desc string
virtualStorage string
@@ -726,7 +737,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
AdditionalReplicaPath: additionalReplicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
ReplicationTargets: []string{"secondary-1", "secondary-2"},
@@ -742,7 +753,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
@@ -760,7 +771,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
@@ -779,7 +790,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
},
),
@@ -795,13 +806,13 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
},
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}},
},
@@ -818,7 +829,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
matchRoute: requireOneOf(
RepositoryMutatorRoute{
RepositoryID: 1,
- ReplicaPath: relativePath,
+ ReplicaPath: replicaPath,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
ReplicationTargets: []string{"secondary-2"},
@@ -848,14 +859,12 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- ctx := testhelper.Context(t)
-
db.TruncateAll(t)
rs := datastore.NewPostgresRepositoryStore(db, nil)
if tc.repositoryExists {
require.NoError(t,
- rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, relativePath, "primary", nil, nil, true, true),
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, replicaPath, "primary", nil, nil, true, true),
)
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index d75810246..d894bb3f3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -111,6 +111,13 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(
+ streamInterceptors,
+ RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)),
+ )
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c3681a34c..a47e82529 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -7,8 +7,6 @@ import (
"io"
"math/rand"
"net"
- "os"
- "path/filepath"
"sort"
"strings"
"sync"
@@ -26,8 +24,8 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
@@ -568,43 +566,20 @@ func TestRemoveRepository(t *testing.T) {
verifyReposExistence(t, codes.NotFound)
}
-func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) {
- for {
- select {
- case <-deadline:
- require.Failf(t, "unable to detect path removal for %s", path)
- default:
- _, err := os.Stat(path)
- if os.IsNotExist(err) {
- return
- }
- require.NoError(t, err, "unexpected error while checking path %s", path)
- }
- time.Sleep(time.Millisecond)
- }
-}
-
func TestRenameRepository(t *testing.T) {
- t.Parallel()
- ctx := testhelper.Context(t)
+ testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository)
+}
+func testRenameRepository(t *testing.T, ctx context.Context) {
gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"}
- repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository},
}
- var repo *gitalypb.Repository
- for i, storageName := range gitalyStorages {
- const relativePath = "test-repository"
-
+ for _, storageName := range gitalyStorages {
cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName))
- gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath)
- if repo == nil {
- repo = repos[0]
- }
-
+ gitalyCfg := cfgBuilder.Build(t)
gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{
@@ -612,76 +587,95 @@ func TestRenameRepository(t *testing.T) {
Address: gitalyAddr,
Token: gitalyCfg.Auth.Token,
})
-
- repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath)
}
evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
- tx := testdb.New(t).Begin(t)
- defer tx.Rollback(t)
+ db := testdb.New(t)
+
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
- rs := datastore.NewPostgresRepositoryStore(tx, nil)
- require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false))
+ txManager := transactions.NewManager(praefectCfg)
+ logger := testhelper.NewDiscardingLogEntry(t)
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ NewBackchannelServerFactory(
+ logger,
+ transaction.NewServer(txManager),
+ nil,
+ ),
+ )
- nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
+ nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()})
-
cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
withQueue: evq,
withRepoStore: rs,
withRouter: NewPerRepositoryRouter(
nodeSet.Connections(),
- nodes.NewPerRepositoryElector(tx),
+ nodes.NewPerRepositoryElector(db),
StaticHealthChecker(praefectCfg.StorageNames()),
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()),
+ datastore.NewAssignmentStore(db, praefectCfg.StorageNames()),
rs,
nil,
),
+ withTxMgr: txManager,
})
- defer cleanup()
+ t.Cleanup(cleanup)
- // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
- virtualRepo := proto.Clone(repo).(*gitalypb.Repository)
- virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name
+ virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
+
+ virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{
+ Storages: []gconfig.Storage{{Name: "praefect"}},
+ }, gittest.CreateRepositoryConfig{ClientConn: cc})
+
+ const newRelativePath = "unused-relative-path"
repoServiceClient := gitalypb.NewRepositoryServiceClient(cc)
- newName, err := text.RandomHex(20)
- require.NoError(t, err)
+ _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: "not-found",
+ },
+ RelativePath: virtualRepo2.RelativePath,
+ })
+ testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err)
+
+ _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
+ Repository: virtualRepo1,
+ RelativePath: virtualRepo2.RelativePath,
+ })
+
+ expectedErr := helper.ErrAlreadyExistsf("target repo exists already")
+ testhelper.RequireGrpcError(t, expectedErr, err)
_, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{
- Repository: virtualRepo,
- RelativePath: newName,
+ Repository: virtualRepo1,
+ RelativePath: newRelativePath,
})
require.NoError(t, err)
resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
- Repository: virtualRepo,
+ Repository: virtualRepo1,
})
require.NoError(t, err)
require.False(t, resp.GetExists(), "repo with old name must gone")
- // as we renamed the repo we need to update RelativePath before we could check if it exists
- renamedVirtualRepo := virtualRepo
- renamedVirtualRepo.RelativePath = newName
-
- // wait until replication jobs propagate changes to other storages
- // as we don't know which one will be used to check because of reads distribution
- require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
- return len(i.GetAcknowledge()) == 2
- }))
-
- for _, oldLocation := range repoPaths {
- pollUntilRemoved(t, oldLocation, time.After(10*time.Second))
- newLocation := filepath.Join(filepath.Dir(oldLocation), newName)
- require.DirExists(t, newLocation, "must be renamed on secondary from %q to %q", oldLocation, newLocation)
- }
+ resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualRepo1.StorageName,
+ RelativePath: newRelativePath,
+ },
+ })
+ require.NoError(t, err)
+ require.True(t, resp.GetExists(), "repo with new name must exist")
}
type mockSmartHTTP struct {
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index 674089166..33441ff07 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -546,11 +546,13 @@ func TestVerifier(t *testing.T) {
for virtualStorage, relativePaths := range tc.replicas {
for relativePath, storages := range relativePaths {
// Create the expected repository. This creates all of the replicas transactionally.
- gittest.CreateRepository(ctx, t,
+ repo, _ := gittest.CreateRepository(ctx, t,
gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}},
gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath},
)
+ replicaPath := gittest.GetReplicaPath(ctx, t, gitalyconfig.Cfg{}, repo, gittest.GetReplicaPathConfig{ClientConn: conn})
+
// Now remove the replicas that were created in the transaction but the test case
// expects not to exist. We remove them directly from the Gitalys so the metadata
// records are left in place.
@@ -591,7 +593,7 @@ func TestVerifier(t *testing.T) {
&gitalypb.RemoveRepositoryRequest{
Repository: &gitalypb.Repository{
StorageName: storage,
- RelativePath: relativePath,
+ RelativePath: replicaPath,
},
},
)
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 281dbc5b8..f3a917c70 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -180,6 +180,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// Randomly inject the Git flag so that we have coverage of tests with both old and new Git
// version by pure chance.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV2361Gl1, rnd.Int()%2 == 0)
+ // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic.
+ // Randomly enable the flag to exercise both paths to some extent.
+ ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0)
for _, opt := range opts {
ctx = opt(ctx)