diff options
author | John Cai <jcai@gitlab.com> | 2021-12-06 18:29:24 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-12-06 18:29:24 +0300 |
commit | 9388cf42b3065b0e7b636bfe22883717eedb5623 (patch) | |
tree | 5932ba624b9caa532d650d29a2991ecdb8b73bca | |
parent | 755ee92ff0c47850a5fee259ea9476d6019173a8 (diff) | |
parent | f93e0e478fa9ca4af5abed33300d7608e7a427f7 (diff) |
Merge branch 'jc-remove-repository-dry-run' into 'master'
make remove-repository dry-run by default
Closes #3893
See merge request gitlab-org/gitaly!4054
-rw-r--r-- | cmd/praefect/subcmd.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository.go | 153 | ||||
-rw-r--r-- | cmd/praefect/subcmd_remove_repository_test.go | 86 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository.go | 4 | ||||
-rw-r--r-- | cmd/praefect/subcmd_track_repository_test.go | 3 |
5 files changed, 188 insertions, 60 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index af1502f31..f6b63ee2c 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -39,7 +39,7 @@ var subcommands = map[string]subcmd{ datalossCmdName: newDatalossSubcommand(), acceptDatalossCmdName: &acceptDatalossSubcommand{}, setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout), - removeRepositoryCmdName: newRemoveRepository(logger), + removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout), trackRepositoryCmdName: newTrackRepository(logger), listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), checkCmdName: newCheckSubcommand( diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 6641ab7fe..f6ee01358 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -6,6 +6,7 @@ import ( "errors" "flag" "fmt" + "io" "strings" "sync" "time" @@ -13,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" @@ -22,28 +24,47 @@ import ( const ( removeRepositoryCmdName = "remove-repository" + paramApply = "apply" ) +type writer struct { + m sync.Mutex + w io.Writer +} + +func (w *writer) Write(b []byte) (int, error) { + w.m.Lock() + defer w.m.Unlock() + return w.w.Write(b) +} + type removeRepository struct { logger logrus.FieldLogger virtualStorage string relativePath string + apply bool dialTimeout time.Duration + w io.Writer } -func newRemoveRepository(logger logrus.FieldLogger) *removeRepository { - return &removeRepository{logger: logger, dialTimeout: defaultDialTimeout} +func newRemoveRepository(logger logrus.FieldLogger, w io.Writer) *removeRepository { + return &removeRepository{logger: logger, w: &writer{w: w}, dialTimeout: defaultDialTimeout} } func (cmd *removeRepository) FlagSet() *flag.FlagSet { fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError) fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage") + fs.BoolVar(&cmd.apply, paramApply, false, "physically remove the repository from disk and the database") fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository") fs.Usage = func() { printfErr("Description:\n" + " This command removes all state associated with a given repository from the Gitaly Cluster.\n" + " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" + - " database state as tracked by Praefect.\n") + " database state as tracked by Praefect.\n" + + " Runs in dry-run mode by default and lists the gitaly nodes from which the repository will be removed from " + + " without actually removing it from the database and disk.\n" + + " When -apply is used, the repository will be removed from the database as well as\n" + + " the individual gitaly nodes on which they exist.\n") fs.PrintDefaults() printfErr("NOTE:\n" + " It may happen that parts of the repository continue to exist after this command, either because\n" + @@ -86,37 +107,108 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger "relative_path": cmd.relativePath, }).Debug("remove repository") - addr, err := getNodeAddress(cfg) + rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + exists, err := rs.RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath) + if err != nil { + fmt.Fprintf(cmd.w, "error getting storages from database: %v. Please make sure the database"+ + " parameters are set correctly and the database is accessible.\n", err) + } else { + if exists { + fmt.Fprintln(cmd.w, "Repository found in the database.") + } else { + fmt.Fprintln(cmd.w, "Repository is not being tracked in Praefect.") + } + } + + storagesOnDisk, err := cmd.getStoragesFromNodes(ctx, cfg) if err != nil { - return fmt.Errorf("get praefect address from config: %w", err) + return err } - logger.Debugf("remove repository info from praefect database %q", addr) + if len(storagesOnDisk) == 0 { + fmt.Fprintln(cmd.w, "Repository not found on any gitaly nodes.") + } else { + fmt.Fprintf(cmd.w, "Repository found on the following gitaly nodes: %s.\n", strings.Join(storagesOnDisk, ", ")) + } + + if !cmd.apply { + fmt.Fprintln(cmd.w, "Re-run the command with -apply to remove repositories from the database and disk.") + return nil + } + + fmt.Fprintf(cmd.w, "Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", cmd.relativePath) + + fmt.Fprintln(cmd.w, "Removing repository from the database...") removed, err := cmd.removeRepositoryFromDatabase(ctx, db) if err != nil { return fmt.Errorf("remove repository info from praefect database: %w", err) } + if !removed { - logger.Warn("praefect database has no info about the repository") + fmt.Fprintln(cmd.w, "The database has no information about this repository.") + } else { + fmt.Fprintln(cmd.w, "Removed repository metadata from the database.") } - logger.Debug("removal of the repository info from praefect database completed") - logger.Debug("remove replication events") + fmt.Fprintln(cmd.w, "Removing replication events...") ticker := helper.NewTimerTicker(time.Second) defer ticker.Stop() if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil { return fmt.Errorf("remove scheduled replication events: %w", err) } - logger.Debug("replication events removal completed") + fmt.Fprintln(cmd.w, "Replication event removal completed.") // We should try to remove repository from each of gitaly nodes. - logger.Debug("remove repository directly by each gitaly node") + fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...") cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) - logger.Debug("direct repository removal by each gitaly node completed") + fmt.Fprintln(cmd.w, "Finished removing repository from gitaly nodes.") return nil } +// getStoragesFromNodes looks on disk to finds the storages this repository exists for +func (cmd *removeRepository) getStoragesFromNodes(ctx context.Context, cfg config.Config) ([]string, error) { + var storages []string + for _, vs := range cfg.VirtualStorages { + if vs.Name != cmd.virtualStorage { + continue + } + + storages = make([]string, len(vs.Nodes)) + var wg sync.WaitGroup + + for i := 0; i < len(vs.Nodes); i++ { + wg.Add(1) + go func(node *config.Node, i int) { + defer wg.Done() + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + exists, err := repositoryExists(ctx, repo, node.Address, node.Token) + if err != nil { + cmd.logger.WithError(err).Errorf("checking if repository exists on %q", node.Storage) + } + if exists { + storages[i] = node.Storage + } + }(vs.Nodes[i], i) + } + + wg.Wait() + break + } + + var storagesFound []string + for _, storage := range storages { + if storage != "" { + storagesFound = append(storagesFound, storage) + } + } + + return storagesFound, nil +} + func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) { var removed bool if err := db.QueryRowContext( @@ -204,25 +296,38 @@ func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger return nil } +func (cmd *removeRepository) removeNode( + ctx context.Context, + logger logrus.FieldLogger, + node *config.Node, +) { + logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address) + repo := &gitalypb.Repository{ + StorageName: node.Storage, + RelativePath: cmd.relativePath, + } + removed, err := cmd.removeRepository(ctx, repo, node.Address, node.Token) + if err != nil { + logger.WithError(err).Warnf("repository removal failed for %q", node.Storage) + } else { + if removed { + fmt.Fprintf(cmd.w, "Successfully removed %s from %s\n", cmd.relativePath, node.Storage) + } + fmt.Fprintf(cmd.w, "Did not remove %s from %s\n", cmd.relativePath, node.Storage) + } + logger.Debugf("repository removal call to gitaly %q completed", node.Storage) +} + func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) { for _, vs := range cfg.VirtualStorages { if vs.Name == cmd.virtualStorage { var wg sync.WaitGroup for i := 0; i < len(vs.Nodes); i++ { wg.Add(1) - go func(node *config.Node) { + go func(i int) { defer wg.Done() - logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address) - repo := &gitalypb.Repository{ - StorageName: node.Storage, - RelativePath: cmd.relativePath, - } - _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token) - if err != nil { - logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage) - } - logger.Debugf("repository removal call to gitaly %q completed", node.Storage) - }(vs.Nodes[i]) + cmd.removeNode(ctx, logger, vs.Nodes[i]) + }(i) } wg.Wait() break diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index 45ca8d774..bf2ece389 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "flag" + "fmt" "path/filepath" "strings" "testing" @@ -63,14 +65,6 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "connect to database: send ping: dial tcp: lookup stub") }) - - t.Run("praefect address is not set in config ", func(t *testing.T) { - cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewDiscardingLogger(t)} - db := glsql.NewDB(t) - dbConf := glsql.GetDBConfig(t, db.Name) - err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf}) - require.EqualError(t, err, "get praefect address from config: no Praefect address configured") - }) } func TestRemoveRepository_Exec(t *testing.T) { @@ -121,18 +115,49 @@ func TestRemoveRepository_Exec(t *testing.T) { praefectStorage := conf.VirtualStorages[0].Name + t.Run("dry run", func(t *testing.T) { + var out bytes.Buffer + repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + cmd := &removeRepository{ + logger: testhelper.NewDiscardingLogger(t), + virtualStorage: repo.StorageName, + relativePath: repo.RelativePath, + dialTimeout: time.Second, + apply: false, + w: &writer{w: &out}, + } + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + assert.Contains(t, out.String(), "Repository found in the database.\n") + assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") + assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n") + + repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath) + require.NoError(t, err) + require.True(t, repositoryRowExists) + }) + t.Run("ok", func(t *testing.T) { + var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) cmd := &removeRepository{ logger: testhelper.NewDiscardingLogger(t), virtualStorage: repo.StorageName, relativePath: repo.RelativePath, dialTimeout: time.Second, + apply: true, + w: &writer{w: &out}, } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + assert.Contains(t, out.String(), "Repository found in the database.\n") + assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") + assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) + assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from", repo.RelativePath)) var repositoryRowExists bool require.NoError(t, db.QueryRow( @@ -143,28 +168,24 @@ func TestRemoveRepository_Exec(t *testing.T) { }) t.Run("no info about repository on praefect", func(t *testing.T) { + var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) require.NoError(t, err) - logger := testhelper.NewDiscardingLogger(t) - loggerHook := test.NewLocal(logger) cmd := &removeRepository{ - logger: logrus.NewEntry(logger), + logger: testhelper.NewDiscardingLogger(t), virtualStorage: praefectStorage, relativePath: repo.RelativePath, dialTimeout: time.Second, + apply: true, + w: &writer{w: &out}, } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - var found bool - for _, entry := range loggerHook.AllEntries() { - if strings.Contains(entry.Message, "praefect database has no info about the repository") { - found = true - } - } - require.True(t, found, "no expected message in the log") - + assert.Contains(t, out.String(), "Repository is not being tracked in Praefect.\n") + assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") + assert.Contains(t, out.String(), "The database has no information about this repository.\n") require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) @@ -172,6 +193,7 @@ func TestRemoveRepository_Exec(t *testing.T) { }) t.Run("one of gitalies is out of service", func(t *testing.T) { + var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) g2Srv.Shutdown() @@ -182,26 +204,24 @@ func TestRemoveRepository_Exec(t *testing.T) { virtualStorage: praefectStorage, relativePath: repo.RelativePath, dialTimeout: 100 * time.Millisecond, + apply: true, + w: &writer{w: &out}, } - for { - err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf) - if err == nil { - break - } - regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)" - require.Regexp(t, regexp, err.Error()) - time.Sleep(time.Second) - } + require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - var found bool + var checkExistsOnNodeErrFound, removeRepoFromDiskErrFound bool for _, entry := range loggerHook.AllEntries() { - if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) { - found = true - break + if strings.Contains(entry.Message, `checking if repository exists on "gitaly-2"`) { + checkExistsOnNodeErrFound = true + } + + if strings.Contains(entry.Message, `repository removal failed for "gitaly-2"`) { + removeRepoFromDiskErrFound = true } } - require.True(t, found, "no expected message in the log") + require.True(t, checkExistsOnNodeErrFound) + require.True(t, removeRepoFromDiskErrFound) require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go index 1dda0f52f..810df39a8 100644 --- a/cmd/praefect/subcmd_track_repository.go +++ b/cmd/praefect/subcmd_track_repository.go @@ -189,7 +189,7 @@ func (cmd *trackRepository) trackRepository( return nil } -func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { +func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout) if err != nil { return false, fmt.Errorf("error dialing: %w", err) @@ -219,7 +219,7 @@ func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, c StorageName: node.Storage, RelativePath: cmd.relativePath, } - exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token) + exists, err := repositoryExists(ctx, repo, node.Address, node.Token) if err != nil { logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath) return false, nil diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 5f410d5f3..0e2667af3 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "io" "path/filepath" "testing" "time" @@ -164,6 +165,8 @@ func TestAddRepository_Exec(t *testing.T) { logger: logger, virtualStorage: virtualStorageName, relativePath: relativePath, + w: io.Discard, + apply: true, } require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) |