Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-12-06 18:29:24 +0300
committerJohn Cai <jcai@gitlab.com>2021-12-06 18:29:24 +0300
commit9388cf42b3065b0e7b636bfe22883717eedb5623 (patch)
tree5932ba624b9caa532d650d29a2991ecdb8b73bca
parent755ee92ff0c47850a5fee259ea9476d6019173a8 (diff)
parentf93e0e478fa9ca4af5abed33300d7608e7a427f7 (diff)
Merge branch 'jc-remove-repository-dry-run' into 'master'
make remove-repository dry-run by default Closes #3893 See merge request gitlab-org/gitaly!4054
-rw-r--r--cmd/praefect/subcmd.go2
-rw-r--r--cmd/praefect/subcmd_remove_repository.go153
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go86
-rw-r--r--cmd/praefect/subcmd_track_repository.go4
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go3
5 files changed, 188 insertions, 60 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index af1502f31..f6b63ee2c 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -39,7 +39,7 @@ var subcommands = map[string]subcmd{
datalossCmdName: newDatalossSubcommand(),
acceptDatalossCmdName: &acceptDatalossSubcommand{},
setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout),
- removeRepositoryCmdName: newRemoveRepository(logger),
+ removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout),
trackRepositoryCmdName: newTrackRepository(logger),
listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
checkCmdName: newCheckSubcommand(
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
index 6641ab7fe..f6ee01358 100644
--- a/cmd/praefect/subcmd_remove_repository.go
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -6,6 +6,7 @@ import (
"errors"
"flag"
"fmt"
+ "io"
"strings"
"sync"
"time"
@@ -13,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
@@ -22,28 +24,47 @@ import (
const (
removeRepositoryCmdName = "remove-repository"
+ paramApply = "apply"
)
+type writer struct {
+ m sync.Mutex
+ w io.Writer
+}
+
+func (w *writer) Write(b []byte) (int, error) {
+ w.m.Lock()
+ defer w.m.Unlock()
+ return w.w.Write(b)
+}
+
type removeRepository struct {
logger logrus.FieldLogger
virtualStorage string
relativePath string
+ apply bool
dialTimeout time.Duration
+ w io.Writer
}
-func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
- return &removeRepository{logger: logger, dialTimeout: defaultDialTimeout}
+func newRemoveRepository(logger logrus.FieldLogger, w io.Writer) *removeRepository {
+ return &removeRepository{logger: logger, w: &writer{w: w}, dialTimeout: defaultDialTimeout}
}
func (cmd *removeRepository) FlagSet() *flag.FlagSet {
fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError)
fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.BoolVar(&cmd.apply, paramApply, false, "physically remove the repository from disk and the database")
fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
fs.Usage = func() {
printfErr("Description:\n" +
" This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
" This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
- " database state as tracked by Praefect.\n")
+ " database state as tracked by Praefect.\n" +
+ " Runs in dry-run mode by default and lists the gitaly nodes from which the repository will be removed from " +
+ " without actually removing it from the database and disk.\n" +
+ " When -apply is used, the repository will be removed from the database as well as\n" +
+ " the individual gitaly nodes on which they exist.\n")
fs.PrintDefaults()
printfErr("NOTE:\n" +
" It may happen that parts of the repository continue to exist after this command, either because\n" +
@@ -86,37 +107,108 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger
"relative_path": cmd.relativePath,
}).Debug("remove repository")
- addr, err := getNodeAddress(cfg)
+ rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+ exists, err := rs.RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath)
+ if err != nil {
+ fmt.Fprintf(cmd.w, "error getting storages from database: %v. Please make sure the database"+
+ " parameters are set correctly and the database is accessible.\n", err)
+ } else {
+ if exists {
+ fmt.Fprintln(cmd.w, "Repository found in the database.")
+ } else {
+ fmt.Fprintln(cmd.w, "Repository is not being tracked in Praefect.")
+ }
+ }
+
+ storagesOnDisk, err := cmd.getStoragesFromNodes(ctx, cfg)
if err != nil {
- return fmt.Errorf("get praefect address from config: %w", err)
+ return err
}
- logger.Debugf("remove repository info from praefect database %q", addr)
+ if len(storagesOnDisk) == 0 {
+ fmt.Fprintln(cmd.w, "Repository not found on any gitaly nodes.")
+ } else {
+ fmt.Fprintf(cmd.w, "Repository found on the following gitaly nodes: %s.\n", strings.Join(storagesOnDisk, ", "))
+ }
+
+ if !cmd.apply {
+ fmt.Fprintln(cmd.w, "Re-run the command with -apply to remove repositories from the database and disk.")
+ return nil
+ }
+
+ fmt.Fprintf(cmd.w, "Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", cmd.relativePath)
+
+ fmt.Fprintln(cmd.w, "Removing repository from the database...")
removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
if err != nil {
return fmt.Errorf("remove repository info from praefect database: %w", err)
}
+
if !removed {
- logger.Warn("praefect database has no info about the repository")
+ fmt.Fprintln(cmd.w, "The database has no information about this repository.")
+ } else {
+ fmt.Fprintln(cmd.w, "Removed repository metadata from the database.")
}
- logger.Debug("removal of the repository info from praefect database completed")
- logger.Debug("remove replication events")
+ fmt.Fprintln(cmd.w, "Removing replication events...")
ticker := helper.NewTimerTicker(time.Second)
defer ticker.Stop()
if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil {
return fmt.Errorf("remove scheduled replication events: %w", err)
}
- logger.Debug("replication events removal completed")
+ fmt.Fprintln(cmd.w, "Replication event removal completed.")
// We should try to remove repository from each of gitaly nodes.
- logger.Debug("remove repository directly by each gitaly node")
+ fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...")
cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
- logger.Debug("direct repository removal by each gitaly node completed")
+ fmt.Fprintln(cmd.w, "Finished removing repository from gitaly nodes.")
return nil
}
+// getStoragesFromNodes looks on disk to finds the storages this repository exists for
+func (cmd *removeRepository) getStoragesFromNodes(ctx context.Context, cfg config.Config) ([]string, error) {
+ var storages []string
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name != cmd.virtualStorage {
+ continue
+ }
+
+ storages = make([]string, len(vs.Nodes))
+ var wg sync.WaitGroup
+
+ for i := 0; i < len(vs.Nodes); i++ {
+ wg.Add(1)
+ go func(node *config.Node, i int) {
+ defer wg.Done()
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ exists, err := repositoryExists(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ cmd.logger.WithError(err).Errorf("checking if repository exists on %q", node.Storage)
+ }
+ if exists {
+ storages[i] = node.Storage
+ }
+ }(vs.Nodes[i], i)
+ }
+
+ wg.Wait()
+ break
+ }
+
+ var storagesFound []string
+ for _, storage := range storages {
+ if storage != "" {
+ storagesFound = append(storagesFound, storage)
+ }
+ }
+
+ return storagesFound, nil
+}
+
func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
var removed bool
if err := db.QueryRowContext(
@@ -204,25 +296,38 @@ func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger
return nil
}
+func (cmd *removeRepository) removeNode(
+ ctx context.Context,
+ logger logrus.FieldLogger,
+ node *config.Node,
+) {
+ logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ removed, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("repository removal failed for %q", node.Storage)
+ } else {
+ if removed {
+ fmt.Fprintf(cmd.w, "Successfully removed %s from %s\n", cmd.relativePath, node.Storage)
+ }
+ fmt.Fprintf(cmd.w, "Did not remove %s from %s\n", cmd.relativePath, node.Storage)
+ }
+ logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
+}
+
func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
for _, vs := range cfg.VirtualStorages {
if vs.Name == cmd.virtualStorage {
var wg sync.WaitGroup
for i := 0; i < len(vs.Nodes); i++ {
wg.Add(1)
- go func(node *config.Node) {
+ go func(i int) {
defer wg.Done()
- logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
- repo := &gitalypb.Repository{
- StorageName: node.Storage,
- RelativePath: cmd.relativePath,
- }
- _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
- if err != nil {
- logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage)
- }
- logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
- }(vs.Nodes[i])
+ cmd.removeNode(ctx, logger, vs.Nodes[i])
+ }(i)
}
wg.Wait()
break
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
index 45ca8d774..bf2ece389 100644
--- a/cmd/praefect/subcmd_remove_repository_test.go
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -1,7 +1,9 @@
package main
import (
+ "bytes"
"flag"
+ "fmt"
"path/filepath"
"strings"
"testing"
@@ -63,14 +65,6 @@ func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "connect to database: send ping: dial tcp: lookup stub")
})
-
- t.Run("praefect address is not set in config ", func(t *testing.T) {
- cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewDiscardingLogger(t)}
- db := glsql.NewDB(t)
- dbConf := glsql.GetDBConfig(t, db.Name)
- err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
- require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
- })
}
func TestRemoveRepository_Exec(t *testing.T) {
@@ -121,18 +115,49 @@ func TestRemoveRepository_Exec(t *testing.T) {
praefectStorage := conf.VirtualStorages[0].Name
+ t.Run("dry run", func(t *testing.T) {
+ var out bytes.Buffer
+ repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
+ cmd := &removeRepository{
+ logger: testhelper.NewDiscardingLogger(t),
+ virtualStorage: repo.StorageName,
+ relativePath: repo.RelativePath,
+ dialTimeout: time.Second,
+ apply: false,
+ w: &writer{w: &out},
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+ assert.Contains(t, out.String(), "Repository found in the database.\n")
+ assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
+ assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n")
+
+ repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath)
+ require.NoError(t, err)
+ require.True(t, repositoryRowExists)
+ })
+
t.Run("ok", func(t *testing.T) {
+ var out bytes.Buffer
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
cmd := &removeRepository{
logger: testhelper.NewDiscardingLogger(t),
virtualStorage: repo.StorageName,
relativePath: repo.RelativePath,
dialTimeout: time.Second,
+ apply: true,
+ w: &writer{w: &out},
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+ assert.Contains(t, out.String(), "Repository found in the database.\n")
+ assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
+ assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath))
+ assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from", repo.RelativePath))
var repositoryRowExists bool
require.NoError(t, db.QueryRow(
@@ -143,28 +168,24 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
t.Run("no info about repository on praefect", func(t *testing.T) {
+ var out bytes.Buffer
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
_, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath)
require.NoError(t, err)
- logger := testhelper.NewDiscardingLogger(t)
- loggerHook := test.NewLocal(logger)
cmd := &removeRepository{
- logger: logrus.NewEntry(logger),
+ logger: testhelper.NewDiscardingLogger(t),
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
dialTimeout: time.Second,
+ apply: true,
+ w: &writer{w: &out},
}
require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
- var found bool
- for _, entry := range loggerHook.AllEntries() {
- if strings.Contains(entry.Message, "praefect database has no info about the repository") {
- found = true
- }
- }
- require.True(t, found, "no expected message in the log")
-
+ assert.Contains(t, out.String(), "Repository is not being tracked in Praefect.\n")
+ assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:")
+ assert.Contains(t, out.String(), "The database has no information about this repository.\n")
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
@@ -172,6 +193,7 @@ func TestRemoveRepository_Exec(t *testing.T) {
})
t.Run("one of gitalies is out of service", func(t *testing.T) {
+ var out bytes.Buffer
repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name())
g2Srv.Shutdown()
@@ -182,26 +204,24 @@ func TestRemoveRepository_Exec(t *testing.T) {
virtualStorage: praefectStorage,
relativePath: repo.RelativePath,
dialTimeout: 100 * time.Millisecond,
+ apply: true,
+ w: &writer{w: &out},
}
- for {
- err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
- if err == nil {
- break
- }
- regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)"
- require.Regexp(t, regexp, err.Error())
- time.Sleep(time.Second)
- }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
- var found bool
+ var checkExistsOnNodeErrFound, removeRepoFromDiskErrFound bool
for _, entry := range loggerHook.AllEntries() {
- if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) {
- found = true
- break
+ if strings.Contains(entry.Message, `checking if repository exists on "gitaly-2"`) {
+ checkExistsOnNodeErrFound = true
+ }
+
+ if strings.Contains(entry.Message, `repository removal failed for "gitaly-2"`) {
+ removeRepoFromDiskErrFound = true
}
}
- require.True(t, found, "no expected message in the log")
+ require.True(t, checkExistsOnNodeErrFound)
+ require.True(t, removeRepoFromDiskErrFound)
require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
diff --git a/cmd/praefect/subcmd_track_repository.go b/cmd/praefect/subcmd_track_repository.go
index 1dda0f52f..810df39a8 100644
--- a/cmd/praefect/subcmd_track_repository.go
+++ b/cmd/praefect/subcmd_track_repository.go
@@ -189,7 +189,7 @@ func (cmd *trackRepository) trackRepository(
return nil
}
-func (cmd *trackRepository) repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+func repositoryExists(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
conn, err := subCmdDial(ctx, addr, token, defaultDialTimeout)
if err != nil {
return false, fmt.Errorf("error dialing: %w", err)
@@ -219,7 +219,7 @@ func (cmd *trackRepository) authoritativeRepositoryExists(ctx context.Context, c
StorageName: node.Storage,
RelativePath: cmd.relativePath,
}
- exists, err := cmd.repositoryExists(ctx, repo, node.Address, node.Token)
+ exists, err := repositoryExists(ctx, repo, node.Address, node.Token)
if err != nil {
logger.WithError(err).Warnf("checking if repository exists %q, %q", node.Storage, cmd.relativePath)
return false, nil
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 5f410d5f3..0e2667af3 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
+ "io"
"path/filepath"
"testing"
"time"
@@ -164,6 +165,8 @@ func TestAddRepository_Exec(t *testing.T) {
logger: logger,
virtualStorage: virtualStorageName,
relativePath: relativePath,
+ w: io.Discard,
+ apply: true,
}
require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))