diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-05-17 12:12:39 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-05-17 12:12:39 +0300 |
commit | 93762b621c011fe570339c1c247d5197c2cfefcc (patch) | |
tree | cfac354d8a6a34eb8dbf4c6615dea9b8140bc244 | |
parent | 6b31501b13eae70aea5061edc8273c551ba4c349 (diff) | |
parent | 9c19ad66f4b691f47bf5fa900062b45bec36cd08 (diff) |
Merge branch 'smh-repository-id-rename-repository' into 'master'
Handle repository creations, deletions and renames atomically
Closes #4003 and #3485
See merge request gitlab-org/gitaly!4101
22 files changed, 523 insertions, 326 deletions
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 49b049615..5298c713d 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -7,7 +7,6 @@ import ( "flag" "fmt" "io" - "strings" "sync" "time" @@ -19,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) const ( @@ -61,15 +59,11 @@ func (cmd *removeRepository) FlagSet() *flag.FlagSet { " This command removes all state associated with a given repository from the Gitaly Cluster.\n" + " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" + " database state as tracked by Praefect.\n" + - " Runs in dry-run mode by default and lists the gitaly nodes from which the repository will be removed from " + + " Runs in dry-run mode by default checks whether the repository exists" + " without actually removing it from the database and disk.\n" + " When -apply is used, the repository will be removed from the database as well as\n" + " the individual gitaly nodes on which they exist.\n") fs.PrintDefaults() - printfErr("NOTE:\n" + - " It may happen that parts of the repository continue to exist after this command, either because\n" + - " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" + - " It is safe and recommended to re-run this command in such a case.\n") } return fs } @@ -116,21 +110,10 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger if exists { fmt.Fprintln(cmd.w, "Repository found in the database.") } else { - fmt.Fprintln(cmd.w, "Repository is not being tracked in Praefect.") + return errors.New("repository is not being tracked in Praefect") } } - storagesOnDisk, err := cmd.getStoragesFromNodes(ctx, cfg) - if err != nil { - return err - } - - if len(storagesOnDisk) == 0 { - fmt.Fprintln(cmd.w, "Repository not found on any gitaly nodes.") - } else { - fmt.Fprintf(cmd.w, "Repository found on the following gitaly nodes: %s.\n", strings.Join(storagesOnDisk, ", ")) - } - if !cmd.apply { fmt.Fprintln(cmd.w, "Re-run the command with -apply to remove repositories from the database and disk.") return nil @@ -138,18 +121,21 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger fmt.Fprintf(cmd.w, "Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", cmd.relativePath) - fmt.Fprintln(cmd.w, "Removing repository from the database...") - removed, err := cmd.removeRepositoryFromDatabase(ctx, db) + addr, err := getNodeAddress(cfg) if err != nil { - return fmt.Errorf("remove repository info from praefect database: %w", err) + return fmt.Errorf("get node address: %w", err) } - if !removed { - fmt.Fprintln(cmd.w, "The database has no information about this repository.") - } else { - fmt.Fprintln(cmd.w, "Removed repository metadata from the database.") + _, err = cmd.removeRepository(ctx, &gitalypb.Repository{ + StorageName: cmd.virtualStorage, + RelativePath: cmd.relativePath, + }, addr, cfg.Auth.Token) + if err != nil { + return fmt.Errorf("repository removal failed: %w", err) } + fmt.Fprintln(cmd.w, "Repository removal completed.") + fmt.Fprintln(cmd.w, "Removing replication events...") ticker := helper.NewTimerTicker(time.Second) defer ticker.Stop() @@ -157,80 +143,9 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger return fmt.Errorf("remove scheduled replication events: %w", err) } fmt.Fprintln(cmd.w, "Replication event removal completed.") - - // We should try to remove repository from each of gitaly nodes. - fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...") - cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) - fmt.Fprintln(cmd.w, "Finished removing repository from gitaly nodes.") - return nil } -// getStoragesFromNodes looks on disk to finds the storages this repository exists for -func (cmd *removeRepository) getStoragesFromNodes(ctx context.Context, cfg config.Config) ([]string, error) { - var storages []string - for _, vs := range cfg.VirtualStorages { - if vs.Name != cmd.virtualStorage { - continue - } - - storages = make([]string, len(vs.Nodes)) - var wg sync.WaitGroup - - for i := 0; i < len(vs.Nodes); i++ { - wg.Add(1) - go func(node *config.Node, i int) { - defer wg.Done() - repo := &gitalypb.Repository{ - StorageName: node.Storage, - RelativePath: cmd.relativePath, - } - exists, err := repositoryExists(ctx, repo, node.Address, node.Token) - if err != nil { - cmd.logger.WithError(err).Errorf("checking if repository exists on %q", node.Storage) - } - if exists { - storages[i] = node.Storage - } - }(vs.Nodes[i], i) - } - - wg.Wait() - break - } - - var storagesFound []string - for _, storage := range storages { - if storage != "" { - storagesFound = append(storagesFound, storage) - } - } - - return storagesFound, nil -} - -func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) { - var removed bool - if err := db.QueryRowContext( - ctx, - `WITH remove_storages_info AS ( - DELETE FROM storage_repositories - WHERE virtual_storage = $1 AND relative_path = $2 - ) - DELETE FROM repositories - WHERE virtual_storage = $1 AND relative_path = $2 - RETURNING TRUE`, - cmd.virtualStorage, - cmd.relativePath, - ).Scan(&removed); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, fmt.Errorf("query row: %w", err) - } - return removed, nil -} - func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { conn, err := subCmdDial(ctx, addr, token, cmd.dialTimeout) if err != nil { @@ -241,10 +156,7 @@ func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalyp ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName) repositoryClient := gitalypb.NewRepositoryServiceClient(conn) if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil { - if _, ok := status.FromError(err); !ok { - return false, fmt.Errorf("RemoveRepository: %w", err) - } - return false, nil + return false, err } return true, nil } @@ -291,43 +203,3 @@ func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger } return nil } - -func (cmd *removeRepository) removeNode( - ctx context.Context, - logger logrus.FieldLogger, - node *config.Node, -) { - logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address) - repo := &gitalypb.Repository{ - StorageName: node.Storage, - RelativePath: cmd.relativePath, - } - removed, err := cmd.removeRepository(ctx, repo, node.Address, node.Token) - if err != nil { - logger.WithError(err).Warnf("repository removal failed for %q", node.Storage) - } else { - if removed { - fmt.Fprintf(cmd.w, "Successfully removed %s from %s\n", cmd.relativePath, node.Storage) - } else { - fmt.Fprintf(cmd.w, "Did not remove %s from %s\n", cmd.relativePath, node.Storage) - } - } - logger.Debugf("repository removal call to gitaly %q completed", node.Storage) -} - -func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) { - for _, vs := range cfg.VirtualStorages { - if vs.Name == cmd.virtualStorage { - var wg sync.WaitGroup - for i := 0; i < len(vs.Nodes); i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - cmd.removeNode(ctx, logger, vs.Nodes[i]) - }(i) - } - wg.Wait() - break - } - } -} diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index 040dd40e5..1831d131b 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -6,15 +6,15 @@ import ( "fmt" "os" "path/filepath" - "strings" "testing" "time" "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gitalycfg "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -104,9 +104,19 @@ func TestRemoveRepository_Exec(t *testing.T) { praefectStorage := conf.VirtualStorages[0].Name + repositoryExists := func(t testing.TB, repo *gitalypb.Repository) bool { + response, err := gitalypb.NewRepositoryServiceClient(cc).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: repo, + }) + require.NoError(t, err) + return response.GetExists() + } + t.Run("dry run", func(t *testing.T) { var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + cmd := &removeRepository{ logger: testhelper.NewDiscardingLogger(t), virtualStorage: repo.StorageName, @@ -117,15 +127,11 @@ func TestRemoveRepository_Exec(t *testing.T) { } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n") - - repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath) - require.NoError(t, err) - require.True(t, repositoryRowExists) + require.True(t, repositoryExists(t, repo)) }) t.Run("ok", func(t *testing.T) { @@ -144,17 +150,9 @@ func TestRemoveRepository_Exec(t *testing.T) { require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) - assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from", repo.RelativePath)) - assert.NotContains(t, out.String(), fmt.Sprintf("Did not remove %s from", repo.RelativePath)) - - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) + assert.Contains(t, out.String(), "Repository removal completed.") + require.False(t, repositoryExists(t, repo)) }) t.Run("repository doesnt exist on one gitaly", func(t *testing.T) { @@ -176,23 +174,18 @@ func TestRemoveRepository_Exec(t *testing.T) { require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes: gitaly-1") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) - assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from gitaly-1", repo.RelativePath)) - - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) + assert.Contains(t, out.String(), "Repository removal completed.") + require.False(t, repositoryExists(t, repo)) }) t.Run("no info about repository on praefect", func(t *testing.T) { var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) - _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) + _, _, err = repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) require.NoError(t, err) cmd := &removeRepository{ @@ -203,14 +196,10 @@ func TestRemoveRepository_Exec(t *testing.T) { apply: true, w: &writer{w: &out}, } - require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - assert.Contains(t, out.String(), "Repository is not being tracked in Praefect.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") - assert.Contains(t, out.String(), "The database has no information about this repository.\n") - require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) - - requireNoDatabaseInfo(t, db, cmd) + require.Error(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), "repository is not being tracked in Praefect") + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) + require.False(t, repositoryExists(t, repo)) }) t.Run("one of gitalies is out of service", func(t *testing.T) { @@ -218,11 +207,12 @@ func TestRemoveRepository_Exec(t *testing.T) { repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) g2Srv.Shutdown() - logger := testhelper.NewDiscardingLogger(t) - loggerHook := test.NewLocal(logger) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) cmd := &removeRepository{ - logger: logrus.NewEntry(logger), + logger: logrus.NewEntry(testhelper.NewDiscardingLogger(t)), virtualStorage: praefectStorage, relativePath: repo.RelativePath, dialTimeout: 100 * time.Millisecond, @@ -231,43 +221,14 @@ func TestRemoveRepository_Exec(t *testing.T) { } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + assert.Contains(t, out.String(), "Repository removal completed.") - var checkExistsOnNodeErrFound, removeRepoFromDiskErrFound bool - for _, entry := range loggerHook.AllEntries() { - if strings.Contains(entry.Message, `checking if repository exists on "gitaly-2"`) { - checkExistsOnNodeErrFound = true - } - - if strings.Contains(entry.Message, `repository removal failed for "gitaly-2"`) { - removeRepoFromDiskErrFound = true - } - } - require.True(t, checkExistsOnNodeErrFound) - require.True(t, removeRepoFromDiskErrFound) - - require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) - - requireNoDatabaseInfo(t, db, cmd) + require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) + require.False(t, repositoryExists(t, repo)) }) } -func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) { - t.Helper() - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) - var storageRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&storageRowExists)) - require.False(t, storageRowExists) -} - func TestRemoveRepository_removeReplicationEvents(t *testing.T) { t.Parallel() const ( diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 2e3c807bf..3e8750f42 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -3,7 +3,6 @@ package main import ( "bytes" "flag" - "io" "path/filepath" "testing" "time" @@ -170,15 +169,9 @@ func TestAddRepository_Exec(t *testing.T) { defer nodeMgr.Stop() repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - rmRepoCmd := &removeRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: tc.relativePath, - w: io.Discard, - apply: true, - } - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) + require.NoError(t, err) + require.False(t, exists) // create the repo on Gitaly without Praefect knowing require.NoError(t, createRepoThroughGitaly1(tc.relativePath)) @@ -207,7 +200,7 @@ func TestAddRepository_Exec(t *testing.T) { assert.Contains(t, assignments, g1Cfg.Storages[0].Name) assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) + exists, err = repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) require.NoError(t, err) assert.True(t, exists) assert.Contains(t, stdout.String(), tc.expectedOutput) diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index 020ae8271..88c8f93d4 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -178,13 +178,30 @@ func CreateRepository(ctx context.Context, t testing.TB, cfg config.Cfg, configs return clonedRepo, filepath.Join(storage.Path, getReplicaPath(ctx, t, conn, repository)) } +// GetReplicaPathConfig allows for configuring the GetReplicaPath call. +type GetReplicaPathConfig struct { + // ClientConn is the connection used to create the repository. If unset, the config is used to + // dial the service. + ClientConn *grpc.ClientConn +} + // GetReplicaPath retrieves the repository's replica path if the test has been // run with Praefect in front of it. This is necessary if the test creates a repository // through Praefect and peeks into the filesystem afterwards. Conn should be pointing to // Praefect. -func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo) string { - conn := dialService(ctx, t, cfg) - defer conn.Close() +func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo, opts ...GetReplicaPathConfig) string { + require.Less(t, len(opts), 2, "you must either pass no or exactly one option") + + var opt GetReplicaPathConfig + if len(opts) > 0 { + opt = opts[0] + } + + conn := opt.ClientConn + if conn == nil { + conn = dialService(ctx, t, cfg) + defer conn.Close() + } return getReplicaPath(ctx, t, conn, repo) } diff --git a/internal/git/housekeeping/object_pool.go b/internal/git/housekeeping/object_pool.go index 1e4a935c8..4d0a27e4b 100644 --- a/internal/git/housekeeping/object_pool.go +++ b/internal/git/housekeeping/object_pool.go @@ -3,6 +3,8 @@ package housekeeping import ( "regexp" "strings" + + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" ) // railsPoolDirRegexp is used to validate object pool directory structure and name as generated by Rails. @@ -21,5 +23,5 @@ func IsRailsPoolPath(relativePath string) bool { // IsPoolPath returns whether the relative path indicates the repository is an object // pool. func IsPoolPath(relativePath string) bool { - return IsRailsPoolPath(relativePath) + return IsRailsPoolPath(relativePath) || praefectutil.IsPoolPath(relativePath) } diff --git a/internal/git/housekeeping/object_pool_test.go b/internal/git/housekeeping/object_pool_test.go index 6fdded51f..50c3201cd 100644 --- a/internal/git/housekeeping/object_pool_test.go +++ b/internal/git/housekeeping/object_pool_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" ) func TestIsPoolPath(t *testing.T) { @@ -19,6 +20,15 @@ func TestIsPoolPath(t *testing.T) { isPoolPath: true, }, { + desc: "praefect pool path", + relativePath: praefectutil.DerivePoolPath(1), + isPoolPath: true, + }, + { + desc: "praefect replica path", + relativePath: praefectutil.DeriveReplicaPath(1), + }, + { desc: "empty string", }, { diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go index 9983eb894..f28657fb8 100644 --- a/internal/gitaly/service/repository/rename_test.go +++ b/internal/gitaly/service/repository/rename_test.go @@ -1,6 +1,7 @@ package repository import ( + "context" "fmt" "os" "path/filepath" @@ -11,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -19,8 +21,11 @@ import ( ) func TestRenameRepository_success(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositorySuccess) +} + +func testRenameRepositorySuccess(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) // Praefect does not move repositories on the disk so this test case is not run with Praefect. cfg, repo, _, client := setupRepositoryService(ctx, t, testserver.WithDisablePraefect()) @@ -43,8 +48,11 @@ func TestRenameRepository_success(t *testing.T) { } func TestRenameRepository_DestinationExists(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryDestinationExists) +} + +func testRenameRepositoryDestinationExists(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) cfg, client := setupRepositoryServiceWithoutRepo(t) @@ -70,11 +78,11 @@ func TestRenameRepository_DestinationExists(t *testing.T) { } func TestRenameRepository_invalidRequest(t *testing.T) { - // Prafect applies renames to metadata even on failed requests, which fails this test. - testhelper.SkipWithPraefect(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/4003") + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryInvalidRequest) +} +func testRenameRepositoryInvalidRequest(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) _, repo, repoPath, client := setupRepositoryService(ctx, t) storagePath := strings.TrimSuffix(repoPath, "/"+repo.RelativePath) @@ -87,7 +95,7 @@ func TestRenameRepository_invalidRequest(t *testing.T) { { desc: "empty repository", req: &gitalypb.RenameRepositoryRequest{Repository: nil, RelativePath: "/tmp/abc"}, - exp: status.Error(codes.InvalidArgument, gitalyOrPraefect("empty Repository", "repo scoped: empty Repository")), + exp: status.Error(codes.InvalidArgument, "empty Repository"), }, { desc: "empty destination relative path", @@ -101,20 +109,20 @@ func TestRenameRepository_invalidRequest(t *testing.T) { }, { desc: "repository storage doesn't exist", - req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "../usr/bin"}, - exp: status.Error(codes.InvalidArgument, gitalyOrPraefect(`GetStorageByName: no such storage: "stub"`, "repo scoped: invalid Repository")), + req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "usr/bin"}, + exp: status.Error(codes.InvalidArgument, `GetStorageByName: no such storage: "stub"`), }, { desc: "repository relative path doesn't exist", - req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "../usr/bin"}, - exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, storagePath)), + req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "non-existent/directory"}, + exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, gitalyOrPraefect(storagePath, repo.GetStorageName()))), }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { _, err := client.RenameRepository(ctx, tc.req) - testhelper.RequireGrpcError(t, err, tc.exp) + testhelper.RequireGrpcError(t, tc.exp, err) }) } } diff --git a/internal/metadata/featureflag/ff_praefect_generated_paths.go b/internal/metadata/featureflag/ff_praefect_generated_paths.go new file mode 100644 index 000000000..4109bd636 --- /dev/null +++ b/internal/metadata/featureflag/ff_praefect_generated_paths.go @@ -0,0 +1,4 @@ +package featureflag + +// PraefectGeneratedReplicaPaths will enable Praefect generated replica paths for new repositories. +var PraefectGeneratedReplicaPaths = NewFeatureFlag("praefect_generated_replica_paths", false) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 77e83ec5a..3821db602 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -488,6 +488,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall route.RepositoryID, virtualStorage, targetRepo, + route.ReplicaPath, route.Primary.Storage, nil, append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...), @@ -835,7 +836,7 @@ func (c *Coordinator) createTransactionFinalizer( } return c.newRequestFinalizer( - ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, + ctx, route.RepositoryID, virtualStorage, targetRepo, route.ReplicaPath, route.Primary.Storage, updated, outdated, change, params, cause)() } } @@ -992,6 +993,7 @@ func (c *Coordinator) newRequestFinalizer( repositoryID int64, virtualStorage string, targetRepo *gitalypb.Repository, + replicaPath string, primary string, updatedSecondaries []string, outdatedSecondaries []string, @@ -1048,7 +1050,7 @@ func (c *Coordinator) newRequestFinalizer( repositoryID, virtualStorage, targetRepo.GetRelativePath(), - targetRepo.GetRelativePath(), + replicaPath, primary, updatedSecondaries, outdatedSecondaries, diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fce4d5e5b..e0927ae69 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -2673,6 +2673,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { 0, "virtual storage", &gitalypb.Repository{}, + "replica-path", "primary", []string{}, []string{"secondary"}, diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 780073155..35a8a6a68 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -118,6 +118,9 @@ type RepositoryStore interface { // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository // which has no record in the virtual storage or the storage. RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error + // RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace + // RenameRepository which can be removed in a later release. + RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) ConsistentStoragesGetter @@ -535,6 +538,39 @@ AND storage = $2 return nil } +// RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace +// RenameRepository which can be removed in a later release. +func (rs *PostgresRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error { + result, err := rs.db.ExecContext(ctx, ` +WITH repository AS ( + UPDATE repositories + SET relative_path = $3 + WHERE virtual_storage = $1 + AND relative_path = $2 + RETURNING repository_id +) + +UPDATE storage_repositories +SET relative_path = $3 +WHERE repository_id = (SELECT repository_id FROM repository) + `, virtualStorage, relativePath, newRelativePath) + if err != nil { + if glsql.IsUniqueViolation(err, "repository_lookup_index") { + return commonerr.ErrRepositoryAlreadyExists + } + + return fmt.Errorf("query: %w", err) + } + + if rowsAffected, err := result.RowsAffected(); err != nil { + return fmt.Errorf("rows affected: %w", err) + } else if rowsAffected == 0 { + return commonerr.ErrRepositoryNotFound + } + + return nil +} + //nolint: revive,stylecheck // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { const q = ` diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 2a021661d..42acafb6e 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -14,6 +14,7 @@ type MockRepositoryStore struct { SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error + RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) @@ -99,6 +100,11 @@ func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int return m.DeleteReplicaFunc(ctx, repositoryID, storage) } +// RenameRepositoryInPlace runs the mock's RenameRepositoryInPlaceFunc. +func (m MockRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error { + return m.RenameRepositoryInPlaceFunc(ctx, virtualStorage, relativePath, newRelativePath) +} + //nolint: revive,stylecheck // This is unintentionally missing documentation. func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { if m.RenameRepositoryFunc == nil { diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 9c980376c..afdfcdb14 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -765,6 +765,50 @@ func TestRepositoryStore_Postgres(t *testing.T) { }) }) + t.Run("RenameRepositoryInPlace", func(t *testing.T) { + t.Run("rename non-existing", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.Equal(t, + commonerr.ErrRepositoryNotFound, + rs.RenameRepositoryInPlace(ctx, vs, repo, "new-relative-path"), + ) + }) + + t.Run("destination exists", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "relative-path-1", "replica-path-1", "primary", nil, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, vs, "relative-path-2", "replica-path-2", "primary", nil, nil, true, false)) + + require.Equal(t, + commonerr.ErrRepositoryAlreadyExists, + rs.RenameRepositoryInPlace(ctx, vs, "relative-path-1", "relative-path-2"), + ) + }) + + t.Run("successfully renamed", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-relative-path", "original-replica-path", "primary", nil, nil, false, false)) + require.NoError(t, rs.RenameRepositoryInPlace(ctx, vs, "original-relative-path", "renamed-relative-path")) + requireState(t, ctx, db, + virtualStorageState{ + vs: { + "renamed-relative-path": {repositoryID: 1, replicaPath: "original-replica-path"}, + }, + }, + storageState{ + vs: { + "renamed-relative-path": { + "primary": {repositoryID: 1}, + }, + }, + }, + ) + }) + }) + t.Run("RenameRepository", func(t *testing.T) { t.Run("rename non-existing", func(t *testing.T) { rs := newRepositoryStore(t, nil) diff --git a/internal/praefect/praefectutil/replica_path.go b/internal/praefect/praefectutil/replica_path.go index 5f35e1e81..576762d22 100644 --- a/internal/praefect/praefectutil/replica_path.go +++ b/internal/praefect/praefectutil/replica_path.go @@ -3,18 +3,41 @@ package praefectutil import ( "crypto/sha256" "fmt" + "path/filepath" "strconv" + "strings" ) +// poolPathPrefix is the prefix directory where Praefect places object pools. +const poolPathPrefix = "@cluster/pools/" + +// IsPoolPath returns whether the relative path indicates this is a Praefect generated object pool path. +func IsPoolPath(relativePath string) bool { + return strings.HasPrefix(relativePath, poolPathPrefix) +} + // DeriveReplicaPath derives a repository's disk storage path from its repository ID. The repository ID // is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to -// ensure even distribution into subdirectories. The format is @repositories/ab/cd/<repository-id>. +// ensure even distribution into subdirectories. The format is @cluster/repositories/ab/cd/<repository-id>. func DeriveReplicaPath(repositoryID int64) string { + return deriveDiskPath("@cluster/repositories", repositoryID) +} + +// DerivePoolPath derives an object pools's disk storage path from its repository ID. The repository ID +// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to +// ensure even distribution into subdirectories. The format is @cluster/pools/ab/cd/<repository-id>. The pools +// have a different directory prefix from other repositories so Gitaly can identify them in OptimizeRepository +// and avoid pruning them. +func DerivePoolPath(repositoryID int64) string { + return deriveDiskPath(poolPathPrefix, repositoryID) +} + +func deriveDiskPath(prefixDir string, repositoryID int64) string { hasher := sha256.New() // String representation of the ID is used to make it easier to derive the replica paths with // external tools. The error is ignored as the hash.Hash interface is documented to never return // an error. hasher.Write([]byte(strconv.FormatInt(repositoryID, 10))) hash := hasher.Sum(nil) - return fmt.Sprintf("@repositories/%x/%x/%d", hash[0:1], hash[1:2], repositoryID) + return filepath.Join(prefixDir, fmt.Sprintf("%x/%x/%d", hash[0:1], hash[1:2], repositoryID)) } diff --git a/internal/praefect/praefectutil/replica_path_test.go b/internal/praefect/praefectutil/replica_path_test.go index 6084c13ae..572dfc2ed 100644 --- a/internal/praefect/praefectutil/replica_path_test.go +++ b/internal/praefect/praefectutil/replica_path_test.go @@ -4,9 +4,41 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" ) func TestDeriveReplicaPath(t *testing.T) { - require.Equal(t, "@repositories/6b/86/1", DeriveReplicaPath(1)) - require.Equal(t, "@repositories/d4/73/2", DeriveReplicaPath(2)) + require.Equal(t, "@cluster/repositories/6b/86/1", DeriveReplicaPath(1)) + require.Equal(t, "@cluster/repositories/d4/73/2", DeriveReplicaPath(2)) +} + +func TestDerivePoolPath(t *testing.T) { + require.Equal(t, "@cluster/pools/6b/86/1", DerivePoolPath(1)) + require.Equal(t, "@cluster/pools/d4/73/2", DerivePoolPath(2)) +} + +func TestIsPoolPath(t *testing.T) { + for _, tc := range []struct { + desc string + relativePath string + isPoolPath bool + }{ + { + desc: "praefect pool path", + relativePath: DerivePoolPath(1), + isPoolPath: true, + }, + { + desc: "praefect replica path", + relativePath: DeriveReplicaPath(1), + }, + { + desc: "rails pool path", + relativePath: gittest.NewObjectPoolName(t), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.isPoolPath, IsPoolPath(tc.relativePath)) + }) + } } diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go new file mode 100644 index 000000000..495b18e4b --- /dev/null +++ b/internal/praefect/rename_repository.go @@ -0,0 +1,160 @@ +package praefect + +import ( + "errors" + "fmt" + "strings" + + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type renamePeeker struct { + grpc.ServerStream + peeked *gitalypb.RenameRepositoryRequest +} + +func (peeker *renamePeeker) RecvMsg(msg interface{}) error { + // On the first read, we'll return the peeked first message of the stream. + if peeker.peeked != nil { + peeked := peeker.peeked + peeker.peeked = nil + + codec := proxy.NewCodec() + payload, err := codec.Marshal(peeked) + if err != nil { + return fmt.Errorf("marshaling peeked rename request: %w", err) + } + + return codec.Unmarshal(payload, msg) + } + + return peeker.ServerStream.RecvMsg(msg) +} + +func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error { + // These checks are not strictly necessary but they exist to keep retain compatibility with + // Gitaly's tested behavior. + if req.GetRepository() == nil { + return helper.ErrInvalidArgumentf("empty Repository") + } else if req.GetRelativePath() == "" { + return helper.ErrInvalidArgumentf("destination relative path is empty") + } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok { + return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName()) + } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil { + // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function + // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we + // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by + // seeing whether the relative path escapes the root directory. It's not possible to traverse up + // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work, + // we use the /fake-root directory simply to notice if there were traversals in the path. + return helper.ErrInvalidArgumentf("GetRepoPath: %s", err) + } + + return nil +} + +// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should +// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the +// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either. +// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect +// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys. +// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled +// later. +// +// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires +// peeking into the internals of the proxying so we can set restore the frame correctly. +func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor { + virtualStorages := make(map[string]struct{}, len(virtualStorageNames)) + for _, virtualStorage := range virtualStorageNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" { + return handler(srv, stream) + } + + // Peek the message + var request gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&request); err != nil { + return fmt.Errorf("peek rename repository request: %w", err) + } + + // In order for the handlers to work after the message is peeked, the stream is restored + // with an alternative implementation that returns the first message correctly. + stream = &renamePeeker{ServerStream: stream, peeked: &request} + + if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil { + return err + } + + repo := request.GetRepository() + repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath()) + if err != nil { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath()) + } + + return fmt.Errorf("get repository id: %w", err) + } + + replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID) + if err != nil { + return fmt.Errorf("get replica path: %w", err) + } + + // Repositories that do not have a Praefect generated replica path are always handled in the old manner. + // Once the feature flag is removed, all of the repositories will be handled in the atomic manner. + if !strings.HasPrefix(replicaPath, "@cluster") { + return handler(srv, stream) + } + + return handleRenameRepository(srv, stream) + } +} + +// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming +// the repository in the lookup table stored in the database. +func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler { + virtualStorages := make(map[string]struct{}, len(virtualStoragesNames)) + for _, virtualStorage := range virtualStoragesNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream) error { + var req gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil { + return err + } + + if err := rs.RenameRepositoryInPlace(stream.Context(), + req.GetRepository().GetStorageName(), + req.GetRepository().GetRelativePath(), + req.GetRelativePath(), + ); err != nil { + if errors.Is(err, commonerr.ErrRepositoryNotFound) { + return helper.ErrNotFoundf( + `GetRepoPath: not a git repository: "%s/%s"`, + req.GetRepository().GetStorageName(), + req.GetRepository().GetRelativePath(), + ) + } else if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return helper.ErrAlreadyExistsf("target repo exists already") + } + + return helper.ErrInternal(err) + } + + return stream.SendMsg(&gitalypb.RenameRepositoryResponse{}) + } +} diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 6f1b4e7bd..7067cb36e 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" "google.golang.org/grpc" ) @@ -307,11 +310,19 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) } + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(id) + if housekeeping.IsRailsPoolPath(relativePath) { + replicaPath = praefectutil.DerivePoolPath(id) + } + } + replicationFactor := r.defaultReplicationFactors[virtualStorage] if replicationFactor == 1 { return RepositoryMutatorRoute{ RepositoryID: id, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: primary, }, nil } @@ -360,7 +371,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{ RepositoryID: id, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, AdditionalReplicaPath: additionalReplicaPath, Primary: primary, Secondaries: secondaries, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 44d8b10ce..6e620ffe2 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -8,9 +8,11 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "google.golang.org/grpc" @@ -662,6 +664,10 @@ func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) { } func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testPerRepositoryRouterRouteRepositoryCreation) +} + +func testPerRepositoryRouterRouteRepositoryCreation(t *testing.T, ctx context.Context) { t.Parallel() configuredNodes := map[string][]string{ @@ -692,6 +698,11 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { additionalReplicaPath = "additional-replica-path" ) + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(1) + } + for _, tc := range []struct { desc string virtualStorage string @@ -726,7 +737,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, AdditionalReplicaPath: additionalReplicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, @@ -742,7 +753,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -760,7 +771,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -779,7 +790,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), @@ -795,13 +806,13 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, @@ -818,7 +829,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, @@ -848,14 +859,12 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - ctx := testhelper.Context(t) - db.TruncateAll(t) rs := datastore.NewPostgresRepositoryStore(db, nil) if tc.repositoryExists { require.NoError(t, - rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, relativePath, "primary", nil, nil, true, true), + rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, replicaPath, "primary", nil, nil, true, true), ) } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index d75810246..d894bb3f3 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -111,6 +111,13 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append( + streamInterceptors, + RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)), + ) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c..a47e82529 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -7,8 +7,6 @@ import ( "io" "math/rand" "net" - "os" - "path/filepath" "sort" "strings" "sync" @@ -26,8 +24,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" @@ -568,43 +566,20 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.NotFound) } -func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) { - for { - select { - case <-deadline: - require.Failf(t, "unable to detect path removal for %s", path) - default: - _, err := os.Stat(path) - if os.IsNotExist(err) { - return - } - require.NoError(t, err, "unexpected error while checking path %s", path) - } - time.Sleep(time.Millisecond) - } -} - func TestRenameRepository(t *testing.T) { - t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository) +} +func testRenameRepository(t *testing.T, ctx context.Context) { gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"} - repoPaths := make([]string, len(gitalyStorages)) praefectCfg := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}, Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository}, } - var repo *gitalypb.Repository - for i, storageName := range gitalyStorages { - const relativePath = "test-repository" - + for _, storageName := range gitalyStorages { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName)) - gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath) - if repo == nil { - repo = repos[0] - } - + gitalyCfg := cfgBuilder.Build(t) gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{ @@ -612,76 +587,95 @@ func TestRenameRepository(t *testing.T) { Address: gitalyAddr, Token: gitalyCfg.Auth.Token, }) - - repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + + rs := datastore.NewPostgresRepositoryStore(db, nil) - rs := datastore.NewPostgresRepositoryStore(tx, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) + txManager := transactions.NewManager(praefectCfg) + logger := testhelper.NewDiscardingLogEntry(t) + clientHandshaker := backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + logger, + transaction.NewServer(txManager), + nil, + ), + ) - nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) + nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ), + withTxMgr: txManager, }) - defer cleanup() + t.Cleanup(cleanup) - // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it - virtualRepo := proto.Clone(repo).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) + + virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) + + const newRelativePath = "unused-relative-path" repoServiceClient := gitalypb.NewRepositoryServiceClient(cc) - newName, err := text.RandomHex(20) - require.NoError(t, err) + _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: virtualRepo1.StorageName, + RelativePath: "not-found", + }, + RelativePath: virtualRepo2.RelativePath, + }) + testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err) + + _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ + Repository: virtualRepo1, + RelativePath: virtualRepo2.RelativePath, + }) + + expectedErr := helper.ErrAlreadyExistsf("target repo exists already") + testhelper.RequireGrpcError(t, expectedErr, err) _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ - Repository: virtualRepo, - RelativePath: newName, + Repository: virtualRepo1, + RelativePath: newRelativePath, }) require.NoError(t, err) resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ - Repository: virtualRepo, + Repository: virtualRepo1, }) require.NoError(t, err) require.False(t, resp.GetExists(), "repo with old name must gone") - // as we renamed the repo we need to update RelativePath before we could check if it exists - renamedVirtualRepo := virtualRepo - renamedVirtualRepo.RelativePath = newName - - // wait until replication jobs propagate changes to other storages - // as we don't know which one will be used to check because of reads distribution - require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { - return len(i.GetAcknowledge()) == 2 - })) - - for _, oldLocation := range repoPaths { - pollUntilRemoved(t, oldLocation, time.After(10*time.Second)) - newLocation := filepath.Join(filepath.Dir(oldLocation), newName) - require.DirExists(t, newLocation, "must be renamed on secondary from %q to %q", oldLocation, newLocation) - } + resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: &gitalypb.Repository{ + StorageName: virtualRepo1.StorageName, + RelativePath: newRelativePath, + }, + }) + require.NoError(t, err) + require.True(t, resp.GetExists(), "repo with new name must exist") } type mockSmartHTTP struct { diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index 674089166..33441ff07 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -546,11 +546,13 @@ func TestVerifier(t *testing.T) { for virtualStorage, relativePaths := range tc.replicas { for relativePath, storages := range relativePaths { // Create the expected repository. This creates all of the replicas transactionally. - gittest.CreateRepository(ctx, t, + repo, _ := gittest.CreateRepository(ctx, t, gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}}, gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath}, ) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalyconfig.Cfg{}, repo, gittest.GetReplicaPathConfig{ClientConn: conn}) + // Now remove the replicas that were created in the transaction but the test case // expects not to exist. We remove them directly from the Gitalys so the metadata // records are left in place. @@ -591,7 +593,7 @@ func TestVerifier(t *testing.T) { &gitalypb.RemoveRepositoryRequest{ Repository: &gitalypb.Repository{ StorageName: storage, - RelativePath: relativePath, + RelativePath: replicaPath, }, }, ) diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 281dbc5b8..f3a917c70 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -180,6 +180,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // Randomly inject the Git flag so that we have coverage of tests with both old and new Git // version by pure chance. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV2361Gl1, rnd.Int()%2 == 0) + // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic. + // Randomly enable the flag to exercise both paths to some extent. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) |