Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-14 13:19:21 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-14 13:19:21 +0300
commit5376d026d3b602a4227d767a10e8d7ef76208d33 (patch)
tree805ca71181bf06f5fef060c464ce96f2d010c275 /cmd
parent7cecb3c094f9e7a17df5b115c185bf6a462a8d02 (diff)
parentee6b638882f9aae601a2c37821e0e82e9d6e333f (diff)
Merge branch 'ps-repo-removal-cli' into 'master'
remove-repository: A new sub-command for the praefect to remove repository See merge request gitlab-org/gitaly!3767
Diffstat (limited to 'cmd')
-rw-r--r--cmd/praefect/subcmd.go3
-rw-r--r--cmd/praefect/subcmd_remove_repository.go221
-rw-r--r--cmd/praefect/subcmd_remove_repository_test.go347
3 files changed, 571 insertions, 0 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 855c73c15..ca11658fe 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -32,6 +32,7 @@ var subcommands = map[string]subcmd{
"dataloss": newDatalossSubcommand(),
"accept-dataloss": &acceptDatalossSubcommand{},
"set-replication-factor": newSetReplicatioFactorSubcommand(os.Stdout),
+ removeRepositoryCmdName: newRemoveRepository(logger),
}
// subCommand returns an exit code, to be fed into os.Exit.
@@ -71,6 +72,8 @@ func getNodeAddress(cfg config.Config) (string, error) {
return "unix:" + cfg.SocketPath, nil
case cfg.ListenAddr != "":
return "tcp://" + cfg.ListenAddr, nil
+ case cfg.TLSListenAddr != "":
+ return "tls://" + cfg.TLSListenAddr, nil
default:
return "", errors.New("no Praefect address configured")
}
diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go
new file mode 100644
index 000000000..016a10daa
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository.go
@@ -0,0 +1,221 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ removeRepositoryCmdName = "remove-repository"
+)
+
+type removeRepository struct {
+ logger logrus.FieldLogger
+ virtualStorage string
+ relativePath string
+}
+
+func newRemoveRepository(logger logrus.FieldLogger) *removeRepository {
+ return &removeRepository{logger: logger}
+}
+
+func (cmd *removeRepository) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(removeRepositoryCmdName, flag.ExitOnError)
+ fs.StringVar(&cmd.virtualStorage, paramVirtualStorage, "", "name of the repository's virtual storage")
+ fs.StringVar(&cmd.relativePath, paramRelativePath, "", "relative path to the repository")
+ fs.Usage = func() {
+ _, _ = printfErr("Description:\n" +
+ " This command removes all state associated with a given repository from the Gitaly Cluster.\n" +
+ " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" +
+ " database state as tracked by Praefect.\n")
+ fs.PrintDefaults()
+ _, _ = printfErr("NOTE:\n" +
+ " It may happen that parts of the repository continue to exist after this command, either because\n" +
+ " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" +
+ " It is safe and recommended to re-run this command in such a case.\n")
+ }
+ return fs
+}
+
+func (cmd removeRepository) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ case cmd.virtualStorage == "":
+ return requiredParameterError(paramVirtualStorage)
+ case cmd.relativePath == "":
+ return requiredParameterError(paramRelativePath)
+ }
+
+ db, err := glsql.OpenDB(cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger := cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ return cmd.exec(ctx, logger, db, cfg)
+}
+
+func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, cfg config.Config) error {
+ // Remove repository explicitly from all storages and clean up database info.
+ // This prevents creation of the new replication events.
+ logger.WithFields(logrus.Fields{
+ "virtual_storage": cmd.virtualStorage,
+ "relative_path": cmd.relativePath,
+ }).Debug("remove repository")
+
+ addr, err := getNodeAddress(cfg)
+ if err != nil {
+ return fmt.Errorf("get praefect address from config: %w", err)
+ }
+
+ logger.Debugf("remove repository info from praefect database %q", addr)
+ removed, err := cmd.removeRepositoryFromDatabase(ctx, db)
+ if err != nil {
+ return fmt.Errorf("remove repository info from praefect database: %w", err)
+ }
+ if !removed {
+ logger.Warn("praefect database has no info about the repository")
+ }
+ logger.Debug("removal of the repository info from praefect database completed")
+
+ logger.Debug("remove replication events")
+ ticker := helper.NewTimerTicker(time.Second)
+ defer ticker.Stop()
+ if err := cmd.removeReplicationEvents(ctx, logger, db, ticker); err != nil {
+ return fmt.Errorf("remove scheduled replication events: %w", err)
+ }
+ logger.Debug("replication events removal completed")
+
+ // We should try to remove repository from each of gitaly nodes.
+ logger.Debug("remove repository directly by each gitaly node")
+ cmd.removeRepositoryForEachGitaly(ctx, cfg, logger)
+ logger.Debug("direct repository removal by each gitaly node completed")
+
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) {
+ var removed bool
+ if err := db.QueryRowContext(
+ ctx,
+ `WITH remove_storages_info AS (
+ DELETE FROM storage_repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ )
+ DELETE FROM repositories
+ WHERE virtual_storage = $1 AND relative_path = $2
+ RETURNING TRUE`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ ).Scan(&removed); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return false, nil
+ }
+ return false, fmt.Errorf("query row: %w", err)
+ }
+ return removed, nil
+}
+
+func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) {
+ conn, err := subCmdDial(addr, token)
+ if err != nil {
+ return false, fmt.Errorf("error dialing: %w", err)
+ }
+ defer func() { _ = conn.Close() }()
+
+ ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
+ if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil {
+ s, ok := status.FromError(err)
+ if !ok {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ if !strings.Contains(s.Message(), fmt.Sprintf("get primary: repository %q/%q not found", cmd.virtualStorage, cmd.relativePath)) {
+ return false, fmt.Errorf("RemoveRepository: %w", err)
+ }
+ return false, nil
+ }
+ return true, nil
+}
+
+func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger logrus.FieldLogger, db *sql.DB, ticker helper.Ticker) error {
+ // Wait for the completion of the repository replication jobs.
+ // As some of them could be a repository creation jobs we need to remove those newly created
+ // repositories after replication finished.
+ start := time.Now()
+ for found := true; found; {
+ ticker.Reset()
+ <-ticker.C()
+ if int(time.Since(start).Seconds())%5 == 0 {
+ logger.Debug("awaiting for the repository in_progress replication jobs to complete...")
+ }
+ row := db.QueryRowContext(
+ ctx,
+ `WITH remove_replication_jobs AS (
+ DELETE FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ -- Do not remove ongoing replication events as we need to wait
+ -- for their completion.
+ AND state != 'in_progress'
+ )
+ SELECT EXISTS(
+ SELECT
+ FROM replication_queue
+ WHERE job->>'virtual_storage' = $1
+ AND job->>'relative_path' = $2
+ AND state = 'in_progress')`,
+ cmd.virtualStorage,
+ cmd.relativePath,
+ )
+ if err := row.Scan(&found); err != nil {
+ return fmt.Errorf("scan in progress jobs: %w", err)
+ }
+ }
+ return nil
+}
+
+func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) {
+ for _, vs := range cfg.VirtualStorages {
+ if vs.Name == cmd.virtualStorage {
+ var wg sync.WaitGroup
+ for i := 0; i < len(vs.Nodes); i++ {
+ wg.Add(1)
+ go func(node *config.Node) {
+ defer wg.Done()
+ logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address)
+ repo := &gitalypb.Repository{
+ StorageName: node.Storage,
+ RelativePath: cmd.relativePath,
+ }
+ _, err := cmd.removeRepository(ctx, repo, node.Address, node.Token)
+ if err != nil {
+ logger.WithError(err).Warnf("repository removal failed for gitaly %q", node.Storage)
+ }
+ logger.Debugf("repository removal call to gitaly %q completed", node.Storage)
+ }(vs.Nodes[i])
+ }
+ wg.Wait()
+ break
+ }
+ }
+}
diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go
new file mode 100644
index 000000000..9dab2f0a6
--- /dev/null
+++ b/cmd/praefect/subcmd_remove_repository_test.go
@@ -0,0 +1,347 @@
+package main
+
+import (
+ "flag"
+ "path/filepath"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+func TestRemoveRepository_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &removeRepository{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--virtual-storage", "vs", "--repository", "repo"}))
+ require.Equal(t, "vs", cmd.virtualStorage)
+ require.Equal(t, "repo", cmd.relativePath)
+}
+
+func TestRemoveRepository_Exec_invalidArgs(t *testing.T) {
+ t.Parallel()
+ t.Run("not all flag values processed", func(t *testing.T) {
+ cmd := removeRepository{}
+ flagSet := flag.NewFlagSet("cmd", flag.PanicOnError)
+ require.NoError(t, flagSet.Parse([]string{"stub"}))
+ err := cmd.Exec(flagSet, config.Config{})
+ require.EqualError(t, err, "cmd doesn't accept positional arguments")
+ })
+
+ t.Run("virtual-storage is not set", func(t *testing.T) {
+ cmd := removeRepository{}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"virtual-storage" is a required parameter`)
+ })
+
+ t.Run("repository is not set", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub"}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{})
+ require.EqualError(t, err, `"repository" is a required parameter`)
+ })
+
+ t.Run("db connection error", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub"}
+ cfg := config.Config{DB: config.DB{Host: "stub", SSLMode: "disable"}}
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), cfg)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "connect to database: dial tcp: lookup stub")
+ })
+
+ t.Run("praefect address is not set in config ", func(t *testing.T) {
+ cmd := removeRepository{virtualStorage: "stub", relativePath: "stub", logger: testhelper.NewTestLogger(t)}
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), config.Config{DB: dbConf})
+ require.EqualError(t, err, "get praefect address from config: no Praefect address configured")
+ })
+}
+
+func TestRemoveRepository_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+
+ starterConfigs, err := getStarterConfigs(conf)
+ require.NoError(t, err)
+ stopped := make(chan struct{})
+ go func() {
+ defer close(stopped)
+ err := run(starterConfigs, conf)
+ assert.EqualError(t, err, `received signal "terminated"`)
+ }()
+
+ cc, err := client.Dial("unix://"+conf.SocketPath, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ createRepo := func(t *testing.T, storageName, relativePath string) *gitalypb.Repository {
+ t.Helper()
+ repo := &gitalypb.Repository{
+ StorageName: storageName,
+ RelativePath: relativePath,
+ }
+ for i := 0; true; i++ {
+ _, err := repoClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{Repository: repo})
+ if err != nil {
+ require.Regexp(t, "(no healthy nodes)|(no such file or directory)|(connection refused)", err.Error())
+ require.Less(t, i, 100, "praefect doesn't serve for too long")
+ time.Sleep(50 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ return repo
+ }
+
+ praefectStorage := conf.VirtualStorages[0].Name
+
+ t.Run("ok", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ cmd := &removeRepository{
+ logger: testhelper.NewTestLogger(t),
+ virtualStorage: repo.StorageName,
+ relativePath: repo.RelativePath,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ })
+
+ t.Run("no info about repository on praefect", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.DeleteRepository(
+ ctx, repo.StorageName, repo.RelativePath, []string{g1Cfg.Storages[0].Name, g2Cfg.Storages[0].Name},
+ ))
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ }
+ require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, "praefect database has no info about the repository") {
+ found = true
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ t.Run("one of gitalies is out of service", func(t *testing.T) {
+ repo := createRepo(t, praefectStorage, "path/to/test/repo")
+ g2Srv.Shutdown()
+
+ logger := testhelper.NewTestLogger(t)
+ loggerHook := test.NewLocal(logger)
+ cmd := &removeRepository{
+ logger: logrus.NewEntry(logger),
+ virtualStorage: praefectStorage,
+ relativePath: repo.RelativePath,
+ }
+
+ for {
+ err := cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
+ if err == nil {
+ break
+ }
+ regexp := "(transport: Error while dialing dial unix /" + strings.TrimPrefix(g2Srv.Address(), "unix:/") + ")|(primary gitaly is not healthy)"
+ require.Regexp(t, regexp, err.Error())
+ time.Sleep(time.Second)
+ }
+
+ var found bool
+ for _, entry := range loggerHook.AllEntries() {
+ if strings.Contains(entry.Message, `repository removal failed for gitaly "gitaly-2"`) {
+ found = true
+ break
+ }
+ }
+ require.True(t, found, "no expected message in the log")
+
+ require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath))
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath))
+
+ requireNoDatabaseInfo(t, db, cmd)
+ })
+
+ require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGTERM))
+ <-stopped
+}
+
+func requireNoDatabaseInfo(t *testing.T, db glsql.DB, cmd *removeRepository) {
+ t.Helper()
+ var repositoryRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&repositoryRowExists))
+ require.False(t, repositoryRowExists)
+ var storageRowExists bool
+ require.NoError(t, db.QueryRow(
+ `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`,
+ cmd.virtualStorage, cmd.relativePath,
+ ).Scan(&storageRowExists))
+ require.False(t, storageRowExists)
+}
+
+func TestRemoveRepository_removeReplicationEvents(t *testing.T) {
+ t.Parallel()
+ const (
+ virtualStorage = "praefect"
+ relativePath = "relative_path/to/repo.git"
+ )
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := glsql.NewDB(t)
+
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+
+ // Set replication event in_progress.
+ inProgressEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.CreateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-2",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress1, err := queue.Dequeue(ctx, virtualStorage, "gitaly-2", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress1, 1)
+
+ // New event - events in the 'ready' state should be removed.
+ _, err = queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-3",
+ SourceNodeStorage: "gitaly-1",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+
+ // Failed event - should be removed as well.
+ failedEvent, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: virtualStorage,
+ TargetNodeStorage: "gitaly-4",
+ SourceNodeStorage: "gitaly-0",
+ RelativePath: relativePath,
+ },
+ })
+ require.NoError(t, err)
+ inProgress2, err := queue.Dequeue(ctx, virtualStorage, "gitaly-4", 10)
+ require.NoError(t, err)
+ require.Len(t, inProgress2, 1)
+ // Acknowledge with failed status, so it will remain in the database for the next processing
+ // attempt or until it is deleted by the 'removeReplicationEvents' method.
+ acks2, err := queue.Acknowledge(ctx, datastore.JobStateFailed, []uint64{inProgress2[0].ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{inProgress2[0].ID}, acks2)
+
+ ticker := helper.NewTimerTicker(time.Millisecond)
+ defer ticker.Stop()
+
+ errChan := make(chan error, 1)
+ go func() {
+ cmd := &removeRepository{virtualStorage: virtualStorage, relativePath: relativePath}
+ errChan <- cmd.removeReplicationEvents(ctx, testhelper.NewTestLogger(t), db.DB, ticker)
+ }()
+ go func() {
+ // We acknowledge in_progress job, so it unblocks the waiting loop.
+ acks, err := queue.Acknowledge(ctx, datastore.JobStateCompleted, []uint64{inProgressEvent.ID})
+ if assert.NoError(t, err) {
+ assert.Equal(t, []uint64{inProgress1[0].ID}, acks)
+ }
+ }()
+
+ for checkChan, exists := errChan, true; exists; {
+ select {
+ case err := <-checkChan:
+ require.NoError(t, err)
+ close(errChan)
+ checkChan = nil
+ default:
+ }
+ // Wait until job removed
+ row := db.QueryRow(`SELECT EXISTS(SELECT FROM replication_queue WHERE id = $1)`, failedEvent.ID)
+ require.NoError(t, row.Scan(&exists))
+ }
+ // Once there are no in_progress jobs anymore the method returns.
+ require.NoError(t, <-errChan)
+
+ var notExists bool
+ row := db.QueryRow(`SELECT NOT EXISTS(SELECT FROM replication_queue)`)
+ require.NoError(t, row.Scan(&notExists))
+ require.True(t, notExists)
+}