Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-08-17 16:34:47 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-14 12:23:27 +0300
commitee6b638882f9aae601a2c37821e0e82e9d6e333f (patch)
treea6e175efc2e7df87d3a39489e6aaa61ccfaa740c
parent229ba800d77e9a2baed1ee8fcd937ac0b094c3e0 (diff)
remove-repository: A new sub-command for the praefect to remove repository
Users of the praefect are missing tools to manage state of the cluster. One of such tools is a repository removal cli. It should be used to remove any repository from the cluster. The removal covers cleanup of the database and removal from the gitaly storages. The command tries to remove as much as possible first by removing from praefect, replication event queue and each gitaly node configured in the provided config file. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3771 Changelog: added
-rw-r--r--cmd/praefect/subcmd.go3
-rw-r--r--cmd/praefect/subcmd_remove_repository.go221
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go347
3 files changed, 571 insertions, 0 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 855c73c15..ca11658fe 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -32,6 +32,7 @@ var subcommands = map[string]subcmd{
"dataloss": newDatalossSubcommand(),
"accept-dataloss": &acceptDatalossSubcommand{},
"set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
}
// subCommand returns an exit code, to be fed into os.Exit.
@@ -71,6 +72,8 @@ func getNodeAddress(cfg config.Config) (string, error) {
return "unix:" + cfg.SocketPath, nil
case cfg.ListenAddr != "":
return "tcp://" + cfg.ListenAddr, nil
+ case cfg.TLSListenAddr != "":
+ return "tls://" + cfg.TLSListenAddr, nil
default:
return "", errors.New("no Praefect address configured")
}
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
new file mode 100644
index 000000000..016a10daa
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -0,0 +1,221 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ removeRepositoryCmdName = "remove-repository"
+)
+
+type removeRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+}
+
+func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
+ return &removeRepository{logger: logger}
+}
+
+func (cmd *removeRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
+ " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
+ " database state as tracked by Praefect.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " It may happen that parts of the repository continue to exist after this command, either because\n" +
+ " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" +
+ " It is safe and recommended to re-run this command in such a case.\n")
+ }
+ return fs
+}
+
+func (cmd removeRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ // Remove repository explicitly from all storages and clean up database info.
+ // This prevents creation of the new replication events.
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ }).Debug("remove repository")
+
+ addr, err := getNodeAddress(cfg)
+ if err != nil {
+ return fmt.Errorf("get praefect address from config: %w", err)
+ }
+
+ logger.Debugf("remove repository info from praefect database %q", addr)
+ removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
+ if err != nil {
+ return fmt.Errorf("remove repository info from praefect database: %w", err)
+ }
+ if !removed {
+ logger.Warn("praefect database has no info about the repository")
+ }
+ logger.Debug("removal of the repository info from praefect database completed")
+
+ logger.Debug("remove replication events")
+ ticker := helper.NewTimerTicker(time.Second)
+ defer ticker.Stop()
+ if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil {
+ return fmt.Errorf("remove scheduled replication events: %w", err)
+ }
+ logger.Debug("replication events removal completed")
+
+ // We should try to remove repository from each of gitaly nodes.
+ logger.Debug("remove repository directly by each gitaly node")
+ cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
+ logger.Debug("direct repository removal by each gitaly node completed")
+
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
+ var removed bool
+ if err := db.QueryRowContext(
+ ctx,
+ `WITH remove_storages_info AS (
+ DELETE FROM storage_repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ )
+ DELETE FROM repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ RETURNING TRUE`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ ).Scan(&removed); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return false, nil
+ }
+ return false, fmt.Errorf("query row: %w", err)
+ }
+ return removed, nil
+}
+
+func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(addr, token)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil {
+ s, ok := status.FromError(err)
+ if !ok {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ if !strings.Contains(s.Message(), fmt.Sprintf("get primary: repository %q/%q not found", cmd.virtualStorage, cmd.relativePath)) {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ return false, nil
+ }
+ return true, nil
+}
+
+func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error {
+ // Wait for the completion of the repository replication jobs.
+ // As some of them could be a repository creation jobs we need to remove those newly created
+ // repositories after replication finished.
+ start := time.Now()
+ for found := true; found; {
+ ticker.Reset()
+ <-ticker.C()
+ if int(time.Since(start).Seconds())%5 == 0 {
+ logger.Debug("awaiting for the repository in_progress replication jobs to complete...")
+ }
+ row := db.QueryRowContext(
+ ctx,
+ `WITH remove_replication_jobs AS (
+ DELETE FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ -- Do not remove ongoing replication events as we need to wait
+ -- for their completion.
+ AND state != 'in_progress'
+ )
+ SELECT EXISTS(
+ SELECT
+ FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ AND state = 'in_progress')`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ )
+ if err := row.Scan(&found); err != nil {
+ return fmt.Errorf("scan in progress jobs: %w", err)
+ }
+ }
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ var wg sync.WaitGroup
+ for i := 0; i < len(vs.Nodes); i++ {
+ wg.Add(1)
+ go func(node *config.Node) {
+ defer wg.Done()
+ logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage)
+ }
+ logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
+ }(vs.Nodes[i])
+ }
+ wg.Wait()
+ break
+ }
+ }
+}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
new file mode 100644
index 000000000..9dab2f0a6
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -0,0 +1,347 @@
+package main
+
+import (
+ "flag"
+ "path/filepath"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestRemoveRepository_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &removeRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+}
+
+func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
+ t.Parallel()
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := removeRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := removeRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+
+ t.Run("praefect address is not set in config ", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
+ require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
+ })
+}
+
+func TestRemoveRepository_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ stopped := make(chan struct{})
+ go func() {
+ defer close(stopped)
+ err := run(starterConfigs, conf)
+ assert.EqualError(t, err, `received signal "terminated"`)
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ return repo
+ }
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ t.Run("ok", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ cmd := &removeRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: repo.StorageName,
+ relativePath: repo.RelativePath,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ })
+
+ t.Run("no info about repository on praefect", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.DeleteRepository(
+ ctx, repo.StorageName, repo.RelativePath, []string{g1Cfg.Storages[0].Name, g2Cfg.Storages[0].Name},
+ ))
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, "praefect database has no info about the repository") {
+ found = true
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ t.Run("one of gitalies is out of service", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ g2Srv.Shutdown()
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ }
+
+ for {
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
+ if err == nil {
+ break
+ }
+ regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)"
+ require.Regexp(t, regexp, err.Error())
+ time.Sleep(time.Second)
+ }
+
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) {
+ found = true
+ break
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ <-stopped
+}
+
+func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
+ t.Helper()
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ var storageRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&storageRowExists))
+ require.False(t, storageRowExists)
+}
+
+func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
+ t.Parallel()
+ const (
+ virtualStorage = "praefect"
+ relativePath = "relative_path/to/repo.git"
+ )
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := glsql.NewDB(t)
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+
+ // Set replication event in_progress.
+ inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.CreateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-2",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress1, 1)
+
+ // New event - events in the 'ready' state should be removed.
+ _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-3",
+ SourceNodeStorage: "gitaly-1",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+
+ // Failed event - should be removed as well.
+ failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-4",
+ SourceNodeStorage: "gitaly-0",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress2, 1)
+ // Acknowledge with failed status, so it will remain in the database for the next processing
+ // attempt or until it is deleted by the 'removeReplicationEvents' method.
+ acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{inProgress2[0].ID}, acks2)
+
+ ticker := helper.NewTimerTicker(time.Millisecond)
+ defer ticker.Stop()
+
+ errChan := make(chan error, 1)
+ go func() {
+ cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath}
+ errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewTestLogger(t), db.DB, ticker)
+ }()
+ go func() {
+ // We acknowledge in_progress job, so it unblocks the waiting loop.
+ acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
+ if assert.NoError(t, err) {
+ assert.Equal(t, []uint64{inProgress1[0].ID}, acks)
+ }
+ }()
+
+ for checkChan, exists := errChan, true; exists; {
+ select {
+ case err := <-checkChan:
+ require.NoError(t, err)
+ close(errChan)
+ checkChan = nil
+ default:
+ }
+ // Wait until job removed
+ row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID)
+ require.NoError(t, row.Scan(&exists))
+ }
+ // Once there are no in_progress jobs anymore the method returns.
+ require.NoError(t, <-errChan)
+
+ var notExists bool
+ row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`)
+ require.NoError(t, row.Scan(&notExists))
+ require.True(t, notExists)
+}