Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-30 17:30:33 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-30 17:30:33 +0300
commiteeca095e5680831ad140c2338d0f1c7d8b06a1ba (patch)
tree1c1f2ccd890d807a75f23c1ae1244d50bb5f0e48 /internal
parent0bf16fc55108fa4ff170ed8a77f104d99f359151 (diff)
parentc378f639b0ac6aa983dd0dae2f5940d3c36dd103 (diff)
Merge branch 'ps-crowler' into 'master'
repocleaner: Log warning message for repositories not known to praefect Closes #3719 See merge request gitlab-org/gitaly!3839
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/config/config.go44
-rw-r--r--internal/praefect/config/config_test.go32
-rw-r--r--internal/praefect/config/testdata/config.overwritedefaults.toml5
-rw-r--r--internal/praefect/config/testdata/config.toml5
-rw-r--r--internal/praefect/datastore/glsql/testing.go2
-rw-r--r--internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go23
-rw-r--r--internal/praefect/datastore/migrations/20210927083631_repository_path_index.go19
-rw-r--r--internal/praefect/datastore/storage_cleanup.go182
-rw-r--r--internal/praefect/datastore/storage_cleanup_test.go283
-rw-r--r--internal/praefect/repocleaner/action_log.go33
-rw-r--r--internal/praefect/repocleaner/action_log_test.go48
-rw-r--r--internal/praefect/repocleaner/init_test.go19
-rw-r--r--internal/praefect/repocleaner/repository.go202
-rw-r--r--internal/praefect/repocleaner/repository_test.go324
14 files changed, 1215 insertions, 6 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 60899deb3..53cd8411e 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -36,6 +36,9 @@ const (
ElectionStrategySQL ElectionStrategy = "sql"
// ElectionStrategyPerRepository configures an SQL based strategy that elects different primaries per repository.
ElectionStrategyPerRepository ElectionStrategy = "per_repository"
+
+ minimalSyncCheckInterval = time.Minute
+ minimalSyncRunInterval = time.Minute
)
type Failover struct {
@@ -122,10 +125,10 @@ type Config struct {
DB `toml:"database"`
Failover Failover `toml:"failover"`
// Keep for legacy reasons: remove after Omnibus has switched
- FailoverEnabled bool `toml:"failover_enabled"`
- MemoryQueueEnabled bool `toml:"memory_queue_enabled"`
- GracefulStopTimeout config.Duration `toml:"graceful_stop_timeout"`
-
+ FailoverEnabled bool `toml:"failover_enabled"`
+ MemoryQueueEnabled bool `toml:"memory_queue_enabled"`
+ GracefulStopTimeout config.Duration `toml:"graceful_stop_timeout"`
+ RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup"`
// ForceCreateRepositories will enable force-creation of repositories in the
// coordinator when routing repository-scoped mutators. This must never be used
// outside of tests.
@@ -156,7 +159,8 @@ func FromFile(filePath string) (Config, error) {
Replication: DefaultReplicationConfig(),
Prometheus: prometheus.DefaultConfig(),
// Sets the default Failover, to be overwritten when deserializing the TOML
- Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository},
+ Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository},
+ RepositoriesCleanup: DefaultRepositoriesCleanup(),
}
if err := toml.Unmarshal(b, conf); err != nil {
return Config{}, err
@@ -249,6 +253,15 @@ func (c *Config) Validate() error {
}
}
+ if c.RepositoriesCleanup.RunInterval.Duration() > 0 {
+ if c.RepositoriesCleanup.CheckInterval.Duration() < minimalSyncCheckInterval {
+ return fmt.Errorf("repositories_cleanup.check_interval is less then %s, which could lead to a database performance problem", minimalSyncCheckInterval.String())
+ }
+ if c.RepositoriesCleanup.RunInterval.Duration() < minimalSyncRunInterval {
+ return fmt.Errorf("repositories_cleanup.run_interval is less then %s, which could lead to a database performance problem", minimalSyncRunInterval.String())
+ }
+ }
+
return nil
}
@@ -416,3 +429,24 @@ func (db DB) ToPQString(direct bool) string {
return strings.Join(fields, " ")
}
+
+// RepositoriesCleanup configures repository synchronisation.
+type RepositoriesCleanup struct {
+ // CheckInterval is a time period used to check if operation should be executed.
+ // It is recommended to keep it less than run_interval configuration as some
+ // nodes may be out of service, so they can be stale for too long.
+ CheckInterval config.Duration `toml:"check_interval"`
+ // RunInterval: the check runs if the previous operation was done at least RunInterval before.
+ RunInterval config.Duration `toml:"run_interval"`
+ // RepositoriesInBatch is the number of repositories to pass as a batch for processing.
+ RepositoriesInBatch int `toml:"repositories_in_batch"`
+}
+
+// DefaultRepositoriesCleanup contains default configuration values for the RepositoriesCleanup.
+func DefaultRepositoriesCleanup() RepositoriesCleanup {
+ return RepositoriesCleanup{
+ CheckInterval: config.Duration(30 * time.Minute),
+ RunInterval: config.Duration(24 * time.Hour),
+ RepositoriesInBatch: 16,
+ }
+}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 205f2a57d..16280d9b1 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -198,6 +198,20 @@ func TestConfigValidation(t *testing.T) {
},
errMsg: `virtual storage "default" has a default replication factor (2) which is higher than the number of storages (1)`,
},
+ {
+ desc: "repositories_cleanup minimal duration is too low",
+ changeConfig: func(cfg *Config) {
+ cfg.RepositoriesCleanup.CheckInterval = config.Duration(minimalSyncCheckInterval - time.Nanosecond)
+ },
+ errMsg: `repositories_cleanup.check_interval is less then 1m0s, which could lead to a database performance problem`,
+ },
+ {
+ desc: "repositories_cleanup minimal duration is too low",
+ changeConfig: func(cfg *Config) {
+ cfg.RepositoriesCleanup.RunInterval = config.Duration(minimalSyncRunInterval - time.Nanosecond)
+ },
+ errMsg: `repositories_cleanup.run_interval is less then 1m0s, which could lead to a database performance problem`,
+ },
}
for _, tc := range testCases {
@@ -209,7 +223,8 @@ func TestConfigValidation(t *testing.T) {
{Name: "default", Nodes: vs1Nodes},
{Name: "secondary", Nodes: vs2Nodes},
},
- Failover: Failover{ElectionStrategy: ElectionStrategySQL},
+ Failover: Failover{ElectionStrategy: ElectionStrategySQL},
+ RepositoriesCleanup: DefaultRepositoriesCleanup(),
}
tc.changeConfig(&config)
@@ -312,6 +327,11 @@ func TestConfigParsing(t *testing.T) {
BootstrapInterval: config.Duration(1 * time.Second),
MonitorInterval: config.Duration(3 * time.Second),
},
+ RepositoriesCleanup: RepositoriesCleanup{
+ CheckInterval: config.Duration(time.Second),
+ RunInterval: config.Duration(3 * time.Second),
+ RepositoriesInBatch: 10,
+ },
},
},
{
@@ -331,6 +351,11 @@ func TestConfigParsing(t *testing.T) {
BootstrapInterval: config.Duration(5 * time.Second),
MonitorInterval: config.Duration(10 * time.Second),
},
+ RepositoriesCleanup: RepositoriesCleanup{
+ CheckInterval: config.Duration(time.Second),
+ RunInterval: config.Duration(4 * time.Second),
+ RepositoriesInBatch: 11,
+ },
},
},
{
@@ -347,6 +372,11 @@ func TestConfigParsing(t *testing.T) {
BootstrapInterval: config.Duration(time.Second),
MonitorInterval: config.Duration(3 * time.Second),
},
+ RepositoriesCleanup: RepositoriesCleanup{
+ CheckInterval: config.Duration(30 * time.Minute),
+ RunInterval: config.Duration(24 * time.Hour),
+ RepositoriesInBatch: 16,
+ },
},
},
{
diff --git a/internal/praefect/config/testdata/config.overwritedefaults.toml b/internal/praefect/config/testdata/config.overwritedefaults.toml
index 9b204b1fc..e1834e2e1 100644
--- a/internal/praefect/config/testdata/config.overwritedefaults.toml
+++ b/internal/praefect/config/testdata/config.overwritedefaults.toml
@@ -11,3 +11,8 @@ election_strategy = "local"
read_only_after_failover = false
bootstrap_interval = "5s"
monitor_interval = "10s"
+
+[repositories_cleanup]
+check_interval = "1s"
+run_interval = "4s"
+repositories_in_batch = 11
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 09175ed50..7f2464670 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -71,3 +71,8 @@ sslrootcert = "/path/to/sp/root-cert"
error_threshold_window = "20s"
write_error_threshold_count = 1500
read_error_threshold_count = 100
+
+[repositories_cleanup]
+check_interval = "1s"
+run_interval = "3s"
+repositories_in_batch = 10
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 3ca634bff..5181bf9c7 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -98,6 +98,8 @@ func (db DB) TruncateAll(t testing.TB) {
"storage_repositories",
"repositories",
"virtual_storages",
+ "repository_assignments",
+ "storage_cleanups",
)
}
diff --git a/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go
new file mode 100644
index 000000000..17d68cade
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go
@@ -0,0 +1,23 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20210914115710_storage_cleanups_table",
+ Up: []string{
+ `CREATE TABLE storage_cleanups (
+ virtual_storage TEXT NOT NULL,
+ storage TEXT NOT NULL,
+ last_run TIMESTAMP WITHOUT TIME ZONE,
+ triggered_at TIMESTAMP WITHOUT TIME ZONE,
+ PRIMARY KEY (virtual_storage, storage)
+ )`,
+ },
+ Down: []string{
+ `DROP TABLE storage_cleanups`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go
new file mode 100644
index 000000000..cc7022738
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go
@@ -0,0 +1,19 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20210927083631_repository_path_index",
+ Up: []string{
+ `CREATE INDEX CONCURRENTLY repository_replica_path_index
+ ON repositories (replica_path, virtual_storage)`,
+ },
+ DisableTransactionUp: true,
+ Down: []string{
+ `DROP INDEX repository_replica_path_index`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go
new file mode 100644
index 000000000..2c50f2aac
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup.go
@@ -0,0 +1,182 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+)
+
+// RepositoryClusterPath identifies location of the repository in the cluster.
+type RepositoryClusterPath struct {
+ ClusterPath
+ // RelativeReplicaPath relative path to the repository on the disk.
+ RelativeReplicaPath string
+}
+
+// NewRepositoryClusterPath initializes and returns RepositoryClusterPath.
+func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath {
+ return RepositoryClusterPath{
+ ClusterPath: ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativeReplicaPath: relativePath,
+ }
+}
+
+// ClusterPath represents path on the cluster to the storage.
+type ClusterPath struct {
+ // VirtualStorage is the name of the virtual storage.
+ VirtualStorage string
+ // Storage is the name of the gitaly storage.
+ Storage string
+}
+
+// NewStorageCleanup initialises and returns a new instance of the StorageCleanup.
+func NewStorageCleanup(db *sql.DB) *StorageCleanup {
+ return &StorageCleanup{db: db}
+}
+
+// StorageCleanup provides methods on the database for the repository cleanup operation.
+type StorageCleanup struct {
+ db *sql.DB
+}
+
+// Populate adds storage to the set, so it can be acquired afterwards.
+func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage string) error {
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `
+ INSERT INTO storage_cleanups (virtual_storage, storage) VALUES ($1, $2)
+ ON CONFLICT (virtual_storage, storage) DO NOTHING`,
+ virtualStorage, storage,
+ ); err != nil {
+ return fmt.Errorf("exec: %w", err)
+ }
+ return nil
+}
+
+// AcquireNextStorage picks up the next storage for processing.
+// Once acquired no other call to the same method will return the same storage, so it
+// works as exclusive lock on that entry.
+// Once processing is done the returned function needs to be called to release
+// acquired storage. It updates last_run column of the entry on execution.
+func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) {
+ var entry ClusterPath
+ if err := ss.db.QueryRowContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET triggered_at = (NOW() AT TIME ZONE 'UTC')
+ WHERE (virtual_storage, storage) IN (
+ SELECT virtual_storage, storage
+ FROM storage_cleanups
+ WHERE
+ COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1)
+ AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2)
+ ORDER BY last_run NULLS FIRST, virtual_storage, storage
+ LIMIT 1
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING virtual_storage, storage`,
+ inactive.Milliseconds(), updatePeriod.Milliseconds(),
+ ).Scan(&entry.VirtualStorage, &entry.Storage); err != nil {
+ if !errors.Is(err, sql.ErrNoRows) {
+ return nil, nil, fmt.Errorf("scan: %w", err)
+ }
+ return nil, func() error { return nil }, nil
+ }
+
+ stop := make(chan struct{}, 1)
+ stopped := make(chan struct{})
+ go func() {
+ trigger := helper.NewTimerTicker(updatePeriod - 100*time.Millisecond)
+ defer func() {
+ trigger.Stop()
+ close(stopped)
+ }()
+
+ for {
+ trigger.Reset()
+ select {
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
+ case <-trigger.C():
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET triggered_at = NOW()
+ WHERE virtual_storage = $1 AND storage = $2`,
+ entry.VirtualStorage, entry.Storage,
+ ); err != nil {
+ return
+ }
+ }
+ }
+ }()
+
+ return &entry, func() error {
+ // signals health update goroutine to terminate
+ stop <- struct{}{}
+ // waits for the health update goroutine to terminate to prevent update
+ // of the triggered_at after setting it to NULL
+ <-stopped
+
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET last_run = NOW(), triggered_at = NULL
+ WHERE virtual_storage = $1 AND storage = $2`,
+ entry.VirtualStorage, entry.Storage,
+ ); err != nil {
+ return fmt.Errorf("update storage_cleanups: %w", err)
+ }
+ return nil
+ }, nil
+}
+
+// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+// by querying repositories and storage_repositories tables.
+func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]RepositoryClusterPath, error) {
+ if len(replicaPaths) == 0 {
+ return nil, nil
+ }
+
+ rows, err := ss.db.QueryContext(
+ ctx,
+ `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS replica_path
+ EXCEPT (
+ SELECT virtual_storage, storage, replica_path
+ FROM repositories
+ JOIN storage_repositories USING (virtual_storage, relative_path)
+ WHERE virtual_storage = $1 AND storage = $2 AND replica_path = ANY($3)
+ )`,
+ virtualStorage, storage, pq.StringArray(replicaPaths),
+ )
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+
+ var res []RepositoryClusterPath
+ for rows.Next() {
+ var curr RepositoryClusterPath
+ if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativeReplicaPath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+ res = append(res, curr)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("loop: %w", err)
+ }
+ if err := rows.Close(); err != nil {
+ return nil, fmt.Errorf("close: %w", err)
+ }
+ return res, nil
+}
diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go
new file mode 100644
index 000000000..bb8e66d75
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup_test.go
@@ -0,0 +1,283 @@
+package datastore
+
+import (
+ "database/sql"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestStorageCleanup_Populate(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ db := glsql.NewDB(t)
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1"))
+ actual := getAllStoragesCleanup(t, db)
+ single := []storageCleanupRow{{ClusterPath: ClusterPath{VirtualStorage: "praefect", Storage: "gitaly-1"}}}
+ require.Equal(t, single, actual)
+
+ err := storageCleanup.Populate(ctx, "praefect", "gitaly-1")
+ require.NoError(t, err, "population of the same data should not generate an error")
+ actual = getAllStoragesCleanup(t, db)
+ require.Equal(t, single, actual, "same data should not create additional rows or change existing")
+
+ require.NoError(t, storageCleanup.Populate(ctx, "default", "gitaly-2"))
+ multiple := append(single, storageCleanupRow{ClusterPath: ClusterPath{VirtualStorage: "default", Storage: "gitaly-2"}})
+ actual = getAllStoragesCleanup(t, db)
+ require.ElementsMatch(t, multiple, actual, "new data should create additional row")
+}
+
+func TestStorageCleanup_AcquireNextStorage(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ db := glsql.NewDB(t)
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ t.Run("ok", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("last_run condition", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ // Acquire it to initialize last_run column.
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, time.Hour, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Nil(t, clusterPath, "no result expected as there can't be such entries")
+ })
+
+ t.Run("sorting based on storage name as no executions done yet", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g3"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("sorting based on storage name and last_run", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath)
+ })
+
+ t.Run("sorting based on last_run", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath)
+
+ clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("already acquired won't be acquired until released", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+
+ clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.Nil(t, clusterPath, clusterPath)
+ require.NoError(t, release1())
+ require.NoError(t, release2())
+
+ clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NotNil(t, clusterPath)
+ require.NoError(t, release3())
+ })
+
+ t.Run("already acquired won't be acquired until released", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+
+ clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.Nil(t, clusterPath, clusterPath)
+ require.NoError(t, release1())
+ require.NoError(t, release2())
+
+ clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NotNil(t, clusterPath)
+ require.NoError(t, release3())
+ })
+
+ t.Run("acquired for long time triggers update loop", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ start := time.Now().UTC()
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 500*time.Millisecond)
+ require.NoError(t, err)
+
+ // Make sure the triggered_at column has a non NULL value after the record is acquired.
+ check1 := getAllStoragesCleanup(t, db)
+ require.Len(t, check1, 1)
+ require.True(t, check1[0].TriggeredAt.Valid)
+ require.True(t, check1[0].TriggeredAt.Time.After(start), check1[0].TriggeredAt.Time.String(), start.String())
+
+ // Check the goroutine running in the background updates triggered_at column periodically.
+ time.Sleep(time.Second)
+
+ check2 := getAllStoragesCleanup(t, db)
+ require.Len(t, check2, 1)
+ require.True(t, check2[0].TriggeredAt.Valid)
+ require.True(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), check2[0].TriggeredAt.Time.String(), check1[0].TriggeredAt.Time.String())
+
+ require.NoError(t, release())
+
+ // Make sure the triggered_at column has a NULL value after the record is released.
+ check3 := getAllStoragesCleanup(t, db)
+ require.Len(t, check3, 1)
+ require.False(t, check3[0].TriggeredAt.Valid)
+ })
+}
+
+func TestStorageCleanup_Exists(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := glsql.NewDB(t)
+
+ repoStore := NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false))
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ for _, tc := range []struct {
+ desc string
+ virtualStorage string
+ storage string
+ relativeReplicaPaths []string
+ out []RepositoryClusterPath
+ }{
+ {
+ desc: "multiple doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"},
+ },
+ },
+ {
+ desc: "duplicates",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "path/x", "path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ {
+ desc: "all exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2"},
+ out: nil,
+ },
+ {
+ desc: "all doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x", "path/y", "path/z"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/z"},
+ },
+ },
+ {
+ desc: "doesn't exist because of storage",
+ virtualStorage: "vs",
+ storage: "stub",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "stub"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ {
+ desc: "doesn't exist because of virtual storage",
+ virtualStorage: "stub",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ res, err := storageCleanup.DoesntExist(ctx, tc.virtualStorage, tc.storage, tc.relativeReplicaPaths)
+ require.NoError(t, err)
+ require.ElementsMatch(t, tc.out, res)
+ })
+ }
+}
+
+type storageCleanupRow struct {
+ ClusterPath
+ LastRun sql.NullTime
+ TriggeredAt sql.NullTime
+}
+
+func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow {
+ rows, err := db.Query(`SELECT * FROM storage_cleanups`)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, rows.Close())
+ }()
+
+ var res []storageCleanupRow
+ for rows.Next() {
+ var dst storageCleanupRow
+ err := rows.Scan(&dst.VirtualStorage, &dst.Storage, &dst.LastRun, &dst.TriggeredAt)
+ require.NoError(t, err)
+ res = append(res, dst)
+ }
+ require.NoError(t, rows.Err())
+ return res
+}
diff --git a/internal/praefect/repocleaner/action_log.go b/internal/praefect/repocleaner/action_log.go
new file mode 100644
index 000000000..934d69c49
--- /dev/null
+++ b/internal/praefect/repocleaner/action_log.go
@@ -0,0 +1,33 @@
+package repocleaner
+
+import (
+ "context"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+)
+
+// LogWarnAction is an implementation of the Action interface that allows to log a warning message
+// for the repositories that are not known for the praefect.
+type LogWarnAction struct {
+ logger logrus.FieldLogger
+}
+
+// NewLogWarnAction return new instance of the LogWarnAction.
+func NewLogWarnAction(logger logrus.FieldLogger) *LogWarnAction {
+ return &LogWarnAction{
+ logger: logger.WithField("component", "repocleaner.log_warn_action"),
+ }
+}
+
+// Perform logs a warning for each repository that is not known to praefect.
+func (al LogWarnAction) Perform(_ context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ for _, entry := range notExisting {
+ al.logger.WithFields(logrus.Fields{
+ "virtual_storage": entry.VirtualStorage,
+ "storage": entry.Storage,
+ "relative_replica_path": entry.RelativeReplicaPath,
+ }).Warn("repository is not managed by praefect")
+ }
+ return nil
+}
diff --git a/internal/praefect/repocleaner/action_log_test.go b/internal/praefect/repocleaner/action_log_test.go
new file mode 100644
index 000000000..1c7af50b2
--- /dev/null
+++ b/internal/praefect/repocleaner/action_log_test.go
@@ -0,0 +1,48 @@
+package repocleaner
+
+import (
+ "context"
+ "testing"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+)
+
+func TestLogWarnAction_Perform(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ action := NewLogWarnAction(logger)
+ err := action.Perform(context.TODO(), []datastore.RepositoryClusterPath{
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: "vs1", Storage: "g1"}, RelativeReplicaPath: "p/1"},
+ {ClusterPath: datastore.ClusterPath{VirtualStorage: "vs2", Storage: "g2"}, RelativeReplicaPath: "p/2"},
+ })
+ require.NoError(t, err)
+ require.Len(t, hook.AllEntries(), 2)
+
+ exp := []map[string]interface{}{{
+ "Data": logrus.Fields{
+ "component": "repocleaner.log_warn_action",
+ "virtual_storage": "vs1",
+ "storage": "g1",
+ "relative_replica_path": "p/1",
+ },
+ "Message": "repository is not managed by praefect",
+ }, {
+ "Data": logrus.Fields{
+ "component": "repocleaner.log_warn_action",
+ "virtual_storage": "vs2",
+ "storage": "g2",
+ "relative_replica_path": "p/2",
+ },
+ "Message": "repository is not managed by praefect",
+ }}
+
+ require.ElementsMatch(t, exp, []map[string]interface{}{{
+ "Data": hook.AllEntries()[0].Data,
+ "Message": hook.AllEntries()[0].Message,
+ }, {
+ "Data": hook.AllEntries()[1].Data,
+ "Message": hook.AllEntries()[1].Message,
+ }})
+}
diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go
new file mode 100644
index 000000000..011d4f2a4
--- /dev/null
+++ b/internal/praefect/repocleaner/init_test.go
@@ -0,0 +1,19 @@
+package repocleaner
+
+import (
+ "os"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) (code int) {
+ defer testhelper.MustHaveNoChildProcess()
+ cleanup := testhelper.Configure()
+ defer cleanup()
+ return m.Run()
+}
diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go
new file mode 100644
index 000000000..fbf3b841b
--- /dev/null
+++ b/internal/praefect/repocleaner/repository.go
@@ -0,0 +1,202 @@
+package repocleaner
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// StateOwner performs check for the existence of the repositories.
+type StateOwner interface {
+ // DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+ // by querying repositories and storage_repositories tables.
+ DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]datastore.RepositoryClusterPath, error)
+}
+
+// Acquirer acquires storage for processing and no any other Acquirer can acquire it again until it is released.
+type Acquirer interface {
+ // Populate adds provided storage into the pool of entries to acquire.
+ Populate(ctx context.Context, virtualStorage, storage string) error
+ // AcquireNextStorage acquires next storage based on the inactive time.
+ AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*datastore.ClusterPath, func() error, error)
+}
+
+// Action is a procedure to be executed on the repositories that doesn't exist in praefect database.
+type Action interface {
+ // Perform runs actual action for non-existing repositories.
+ Perform(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error
+}
+
+// Runner scans healthy gitaly nodes for the repositories, verifies if
+// found repositories are known by praefect and runs a special action.
+type Runner struct {
+ cfg Cfg
+ logger logrus.FieldLogger
+ healthChecker praefect.HealthChecker
+ conns praefect.Connections
+ stateOwner StateOwner
+ acquirer Acquirer
+ action Action
+}
+
+// Cfg contains set of configuration parameters to run Runner.
+type Cfg struct {
+ // RunInterval: the check runs if the previous operation was done at least RunInterval before.
+ RunInterval time.Duration
+ // LivenessInterval: an update runs on the locked entity with provided period to signal that entity is in use.
+ LivenessInterval time.Duration
+ // RepositoriesInBatch is the number of repositories to pass as a batch for processing.
+ RepositoriesInBatch int
+}
+
+// NewRunner returns instance of the Runner.
+func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.HealthChecker, conns praefect.Connections, stateOwner StateOwner, acquirer Acquirer, action Action) *Runner {
+ return &Runner{
+ cfg: cfg,
+ logger: logger.WithField("component", "repocleaner.repository_existence"),
+ healthChecker: healthChecker,
+ conns: conns,
+ stateOwner: stateOwner,
+ acquirer: acquirer,
+ action: action,
+ }
+}
+
+// Run scans healthy gitaly nodes for the repositories, verifies if
+// found repositories are known by praefect and runs a special action.
+// It runs on each tick of the provided ticker and finishes with context cancellation.
+func (gs *Runner) Run(ctx context.Context, ticker helper.Ticker) error {
+ gs.logger.Info("started")
+ defer gs.logger.Info("completed")
+
+ defer ticker.Stop()
+
+ for virtualStorage, connByStorage := range gs.conns {
+ for storage := range connByStorage {
+ if err := gs.acquirer.Populate(ctx, virtualStorage, storage); err != nil {
+ return fmt.Errorf("populate database: %w", err)
+ }
+ }
+ }
+
+ var tick helper.Ticker
+ for {
+ // We use a local tick variable to run the first cycle
+ // without wait. All the other iterations are waiting
+ // for the next tick or context cancellation.
+ if tick != nil {
+ tick.Reset()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-tick.C():
+ }
+ } else {
+ tick = ticker
+ }
+
+ gs.run(ctx)
+ }
+}
+
+func (gs *Runner) run(ctx context.Context) {
+ clusterPath, release, err := gs.acquirer.AcquireNextStorage(ctx, gs.cfg.RunInterval, gs.cfg.LivenessInterval)
+ if err != nil {
+ gs.logger.WithError(err).Error("unable to acquire next storage to verify")
+ return
+ }
+
+ logger := gs.logger
+ defer func() {
+ if err := release(); err != nil {
+ logger.WithError(err).Error("failed to release storage acquired to verify")
+ }
+ }()
+
+ if clusterPath == nil {
+ gs.logger.Debug("no storages to verify")
+ return
+ }
+
+ logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage)
+ err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) {
+ relativePaths := make([]string, len(paths))
+ for i, path := range paths {
+ relativePaths[i] = path.RelativeReplicaPath
+ }
+ notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths)
+ if err != nil {
+ logger.WithError(err).WithField("repositories", paths).Error("failed to check existence")
+ return
+ }
+
+ if err := gs.action.Perform(ctx, notExisting); err != nil {
+ logger.WithError(err).WithField("existence", notExisting).Error("perform action")
+ return
+ }
+ })
+ if err != nil {
+ logger.WithError(err).Error("failed to exec action on repositories")
+ return
+ }
+}
+
+func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger {
+ return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage})
+}
+
+func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error {
+ gclient, err := gs.getInternalGitalyClient(virtualStorage, storage)
+ if err != nil {
+ return fmt.Errorf("setup gitaly client: %w", err)
+ }
+
+ resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage})
+ if err != nil {
+ return fmt.Errorf("unable to walk repos: %w", err)
+ }
+
+ batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch)
+ for {
+ res, err := resp.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return fmt.Errorf("failure on walking repos: %w", err)
+ }
+ break
+ }
+
+ batch = append(batch, datastore.RepositoryClusterPath{
+ ClusterPath: datastore.ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativeReplicaPath: res.RelativePath,
+ })
+
+ if len(batch) == cap(batch) {
+ action(batch)
+ batch = batch[:0]
+ }
+ }
+ if len(batch) > 0 {
+ action(batch)
+ }
+ return nil
+}
+
+func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) {
+ conn, found := gs.conns[virtualStorage][storage]
+ if !found {
+ return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage)
+ }
+ return gitalypb.NewInternalGitalyClient(conn), nil
+}
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
new file mode 100644
index 000000000..32e7d7ad5
--- /dev/null
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -0,0 +1,324 @@
+package repocleaner
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+)
+
+func TestRunner_Run(t *testing.T) {
+ t.Parallel()
+
+ const (
+ repo1RelPath = "repo-1.git"
+ repo2RelPath = "repo-2.git"
+ repo3RelPath = "repo-3.git"
+
+ storage1 = "gitaly-1"
+ storage2 = "gitaly-2"
+ storage3 = "gitaly-3"
+
+ virtualStorage = "praefect"
+ )
+
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages(storage2))
+ g3Cfg := testcfg.Build(t, testcfg.WithStorages(storage3))
+
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g3Addr := testserver.RunGitalyServer(t, g3Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Addr},
+ {Storage: g3Cfg.Storages[0].Name, Address: g3Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+ cfg := Cfg{
+ RunInterval: time.Duration(1),
+ LivenessInterval: time.Duration(1),
+ RepositoriesInBatch: 2,
+ }
+
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath})
+
+ // second gitaly is missing repo-3.git repository
+ gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+
+ // third gitaly has an extra repo-4.git repository
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath})
+ gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: "repo-4.git"})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ for i, set := range []struct {
+ relativePath string
+ primary string
+ secondaries []string
+ }{
+ {
+ relativePath: repo1RelPath,
+ primary: storage1,
+ secondaries: []string{storage3},
+ },
+ {
+ relativePath: repo2RelPath,
+ primary: storage1,
+ secondaries: []string{storage2, storage3},
+ },
+ {
+ relativePath: repo3RelPath,
+ primary: storage1,
+ secondaries: []string{storage2, storage3},
+ },
+ } {
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false))
+ }
+
+ logger, loggerHook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ entry := logger.WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ storageCleanup := datastore.NewStorageCleanup(db.DB)
+
+ var iteration int32
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1, storage2, storage3}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ i := atomic.LoadInt32(&iteration)
+ switch i {
+ case 0:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 1:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 2:
+ assert.ElementsMatch(
+ t,
+ []datastore.RepositoryClusterPath{
+ datastore.NewRepositoryClusterPath(virtualStorage, storage2, repo1RelPath),
+ },
+ notExisting,
+ )
+ case 3:
+ assert.ElementsMatch(t, nil, notExisting)
+ case 4:
+ assert.Equal(
+ t,
+ []datastore.RepositoryClusterPath{
+ datastore.NewRepositoryClusterPath(virtualStorage, storage3, "repo-4.git"),
+ },
+ notExisting,
+ )
+ }
+ atomic.AddInt32(&iteration, 1)
+ return nil
+ },
+ })
+
+ ticker := helper.NewManualTicker()
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ // we need to trigger it 5 times to make sure the 4-th run is fully completed
+ for i := 0; i < 5; i++ {
+ ticker.Tick()
+ }
+ require.Greater(t, atomic.LoadInt32(&iteration), int32(4))
+ require.Len(t, loggerHook.AllEntries(), 1)
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "started"},
+ map[string]interface{}{"Data": loggerHook.AllEntries()[0].Data, "Message": loggerHook.AllEntries()[0].Message},
+ )
+ // Terminates the loop.
+ cancel()
+ <-done
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "completed"},
+ map[string]interface{}{"Data": loggerHook.LastEntry().Data, "Message": loggerHook.LastEntry().Message},
+ )
+}
+
+func TestRunner_Run_noAvailableStorages(t *testing.T) {
+ t.Parallel()
+
+ const (
+ repo1RelPath = "repo-1.git"
+ storage1 = "gitaly-1"
+ virtualStorage = "praefect"
+ )
+
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1))
+ g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ db := glsql.NewDB(t)
+ var database string
+ require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database))
+ dbConf := glsql.GetDBConfig(t, database)
+
+ conf := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ },
+ },
+ },
+ DB: dbConf,
+ }
+ cfg := Cfg{
+ RunInterval: time.Minute,
+ LivenessInterval: time.Second,
+ RepositoriesInBatch: 2,
+ }
+
+ gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil)
+ for i, set := range []struct {
+ relativePath string
+ primary string
+ }{
+ {
+ relativePath: repo1RelPath,
+ primary: storage1,
+ },
+ } {
+ require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, nil, nil, false, false))
+ }
+
+ logger := testhelper.NewTestLogger(t)
+ entry := logger.WithContext(ctx)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ storageCleanup := datastore.NewStorageCleanup(db.DB)
+ startSecond := make(chan struct{})
+ releaseFirst := make(chan struct{})
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ assert.Empty(t, notExisting)
+ // Block execution here until send instance completes its execution.
+ // It allows us to be sure the picked storage can't be picked once again by
+ // another instance as well as that it works without problems if there is
+ // nothing to pick up to process.
+ close(startSecond)
+ <-releaseFirst
+ return nil
+ },
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ logger, loggerHook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{
+ PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error {
+ assert.FailNow(t, "should not be triggered as there is no available storages to acquire")
+ return nil
+ },
+ })
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ticker := helper.NewManualTicker()
+
+ <-startSecond
+ go func() {
+ defer wg.Done()
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ ticker.Tick()
+ ticker.Tick()
+ close(releaseFirst)
+ cancel()
+ wg.Wait()
+
+ entries := loggerHook.AllEntries()
+ require.Greater(t, len(entries), 2)
+ require.Equal(
+ t,
+ map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "no storages to verify"},
+ map[string]interface{}{"Data": loggerHook.AllEntries()[1].Data, "Message": loggerHook.AllEntries()[1].Message},
+ )
+ }()
+
+ ticker := helper.NewManualTicker()
+ go func() {
+ defer wg.Done()
+ assert.Equal(t, context.Canceled, runner.Run(ctx, ticker))
+ }()
+ ticker.Tick()
+ ticker.Tick() // blocks until first processing cycle is done
+ cancel()
+ wg.Wait()
+}
+
+type actionStub struct {
+ PerformMethod func(ctx context.Context, existence []datastore.RepositoryClusterPath) error
+}
+
+func (as actionStub) Perform(ctx context.Context, existence []datastore.RepositoryClusterPath) error {
+ if as.PerformMethod != nil {
+ return as.PerformMethod(ctx, existence)
+ }
+ return nil
+}