diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-30 17:30:33 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-30 17:30:33 +0300 |
commit | eeca095e5680831ad140c2338d0f1c7d8b06a1ba (patch) | |
tree | 1c1f2ccd890d807a75f23c1ae1244d50bb5f0e48 /internal | |
parent | 0bf16fc55108fa4ff170ed8a77f104d99f359151 (diff) | |
parent | c378f639b0ac6aa983dd0dae2f5940d3c36dd103 (diff) |
Merge branch 'ps-crowler' into 'master'
repocleaner: Log warning message for repositories not known to praefect
Closes #3719
See merge request gitlab-org/gitaly!3839
Diffstat (limited to 'internal')
-rw-r--r-- | internal/praefect/config/config.go | 44 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 32 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.overwritedefaults.toml | 5 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 5 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go | 23 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20210927083631_repository_path_index.go | 19 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_cleanup.go | 182 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_cleanup_test.go | 283 | ||||
-rw-r--r-- | internal/praefect/repocleaner/action_log.go | 33 | ||||
-rw-r--r-- | internal/praefect/repocleaner/action_log_test.go | 48 | ||||
-rw-r--r-- | internal/praefect/repocleaner/init_test.go | 19 | ||||
-rw-r--r-- | internal/praefect/repocleaner/repository.go | 202 | ||||
-rw-r--r-- | internal/praefect/repocleaner/repository_test.go | 324 |
14 files changed, 1215 insertions, 6 deletions
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 60899deb3..53cd8411e 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -36,6 +36,9 @@ const ( ElectionStrategySQL ElectionStrategy = "sql" // ElectionStrategyPerRepository configures an SQL based strategy that elects different primaries per repository. ElectionStrategyPerRepository ElectionStrategy = "per_repository" + + minimalSyncCheckInterval = time.Minute + minimalSyncRunInterval = time.Minute ) type Failover struct { @@ -122,10 +125,10 @@ type Config struct { DB `toml:"database"` Failover Failover `toml:"failover"` // Keep for legacy reasons: remove after Omnibus has switched - FailoverEnabled bool `toml:"failover_enabled"` - MemoryQueueEnabled bool `toml:"memory_queue_enabled"` - GracefulStopTimeout config.Duration `toml:"graceful_stop_timeout"` - + FailoverEnabled bool `toml:"failover_enabled"` + MemoryQueueEnabled bool `toml:"memory_queue_enabled"` + GracefulStopTimeout config.Duration `toml:"graceful_stop_timeout"` + RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup"` // ForceCreateRepositories will enable force-creation of repositories in the // coordinator when routing repository-scoped mutators. This must never be used // outside of tests. @@ -156,7 +159,8 @@ func FromFile(filePath string) (Config, error) { Replication: DefaultReplicationConfig(), Prometheus: prometheus.DefaultConfig(), // Sets the default Failover, to be overwritten when deserializing the TOML - Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository}, + Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository}, + RepositoriesCleanup: DefaultRepositoriesCleanup(), } if err := toml.Unmarshal(b, conf); err != nil { return Config{}, err @@ -249,6 +253,15 @@ func (c *Config) Validate() error { } } + if c.RepositoriesCleanup.RunInterval.Duration() > 0 { + if c.RepositoriesCleanup.CheckInterval.Duration() < minimalSyncCheckInterval { + return fmt.Errorf("repositories_cleanup.check_interval is less then %s, which could lead to a database performance problem", minimalSyncCheckInterval.String()) + } + if c.RepositoriesCleanup.RunInterval.Duration() < minimalSyncRunInterval { + return fmt.Errorf("repositories_cleanup.run_interval is less then %s, which could lead to a database performance problem", minimalSyncRunInterval.String()) + } + } + return nil } @@ -416,3 +429,24 @@ func (db DB) ToPQString(direct bool) string { return strings.Join(fields, " ") } + +// RepositoriesCleanup configures repository synchronisation. +type RepositoriesCleanup struct { + // CheckInterval is a time period used to check if operation should be executed. + // It is recommended to keep it less than run_interval configuration as some + // nodes may be out of service, so they can be stale for too long. + CheckInterval config.Duration `toml:"check_interval"` + // RunInterval: the check runs if the previous operation was done at least RunInterval before. + RunInterval config.Duration `toml:"run_interval"` + // RepositoriesInBatch is the number of repositories to pass as a batch for processing. + RepositoriesInBatch int `toml:"repositories_in_batch"` +} + +// DefaultRepositoriesCleanup contains default configuration values for the RepositoriesCleanup. +func DefaultRepositoriesCleanup() RepositoriesCleanup { + return RepositoriesCleanup{ + CheckInterval: config.Duration(30 * time.Minute), + RunInterval: config.Duration(24 * time.Hour), + RepositoriesInBatch: 16, + } +} diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 205f2a57d..16280d9b1 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -198,6 +198,20 @@ func TestConfigValidation(t *testing.T) { }, errMsg: `virtual storage "default" has a default replication factor (2) which is higher than the number of storages (1)`, }, + { + desc: "repositories_cleanup minimal duration is too low", + changeConfig: func(cfg *Config) { + cfg.RepositoriesCleanup.CheckInterval = config.Duration(minimalSyncCheckInterval - time.Nanosecond) + }, + errMsg: `repositories_cleanup.check_interval is less then 1m0s, which could lead to a database performance problem`, + }, + { + desc: "repositories_cleanup minimal duration is too low", + changeConfig: func(cfg *Config) { + cfg.RepositoriesCleanup.RunInterval = config.Duration(minimalSyncRunInterval - time.Nanosecond) + }, + errMsg: `repositories_cleanup.run_interval is less then 1m0s, which could lead to a database performance problem`, + }, } for _, tc := range testCases { @@ -209,7 +223,8 @@ func TestConfigValidation(t *testing.T) { {Name: "default", Nodes: vs1Nodes}, {Name: "secondary", Nodes: vs2Nodes}, }, - Failover: Failover{ElectionStrategy: ElectionStrategySQL}, + Failover: Failover{ElectionStrategy: ElectionStrategySQL}, + RepositoriesCleanup: DefaultRepositoriesCleanup(), } tc.changeConfig(&config) @@ -312,6 +327,11 @@ func TestConfigParsing(t *testing.T) { BootstrapInterval: config.Duration(1 * time.Second), MonitorInterval: config.Duration(3 * time.Second), }, + RepositoriesCleanup: RepositoriesCleanup{ + CheckInterval: config.Duration(time.Second), + RunInterval: config.Duration(3 * time.Second), + RepositoriesInBatch: 10, + }, }, }, { @@ -331,6 +351,11 @@ func TestConfigParsing(t *testing.T) { BootstrapInterval: config.Duration(5 * time.Second), MonitorInterval: config.Duration(10 * time.Second), }, + RepositoriesCleanup: RepositoriesCleanup{ + CheckInterval: config.Duration(time.Second), + RunInterval: config.Duration(4 * time.Second), + RepositoriesInBatch: 11, + }, }, }, { @@ -347,6 +372,11 @@ func TestConfigParsing(t *testing.T) { BootstrapInterval: config.Duration(time.Second), MonitorInterval: config.Duration(3 * time.Second), }, + RepositoriesCleanup: RepositoriesCleanup{ + CheckInterval: config.Duration(30 * time.Minute), + RunInterval: config.Duration(24 * time.Hour), + RepositoriesInBatch: 16, + }, }, }, { diff --git a/internal/praefect/config/testdata/config.overwritedefaults.toml b/internal/praefect/config/testdata/config.overwritedefaults.toml index 9b204b1fc..e1834e2e1 100644 --- a/internal/praefect/config/testdata/config.overwritedefaults.toml +++ b/internal/praefect/config/testdata/config.overwritedefaults.toml @@ -11,3 +11,8 @@ election_strategy = "local" read_only_after_failover = false bootstrap_interval = "5s" monitor_interval = "10s" + +[repositories_cleanup] +check_interval = "1s" +run_interval = "4s" +repositories_in_batch = 11 diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 09175ed50..7f2464670 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -71,3 +71,8 @@ sslrootcert = "/path/to/sp/root-cert" error_threshold_window = "20s" write_error_threshold_count = 1500 read_error_threshold_count = 100 + +[repositories_cleanup] +check_interval = "1s" +run_interval = "3s" +repositories_in_batch = 10 diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 3ca634bff..5181bf9c7 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -98,6 +98,8 @@ func (db DB) TruncateAll(t testing.TB) { "storage_repositories", "repositories", "virtual_storages", + "repository_assignments", + "storage_cleanups", ) } diff --git a/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go new file mode 100644 index 000000000..17d68cade --- /dev/null +++ b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go @@ -0,0 +1,23 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20210914115710_storage_cleanups_table", + Up: []string{ + `CREATE TABLE storage_cleanups ( + virtual_storage TEXT NOT NULL, + storage TEXT NOT NULL, + last_run TIMESTAMP WITHOUT TIME ZONE, + triggered_at TIMESTAMP WITHOUT TIME ZONE, + PRIMARY KEY (virtual_storage, storage) + )`, + }, + Down: []string{ + `DROP TABLE storage_cleanups`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go new file mode 100644 index 000000000..cc7022738 --- /dev/null +++ b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go @@ -0,0 +1,19 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20210927083631_repository_path_index", + Up: []string{ + `CREATE INDEX CONCURRENTLY repository_replica_path_index + ON repositories (replica_path, virtual_storage)`, + }, + DisableTransactionUp: true, + Down: []string{ + `DROP INDEX repository_replica_path_index`, + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go new file mode 100644 index 000000000..2c50f2aac --- /dev/null +++ b/internal/praefect/datastore/storage_cleanup.go @@ -0,0 +1,182 @@ +package datastore + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/lib/pq" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" +) + +// RepositoryClusterPath identifies location of the repository in the cluster. +type RepositoryClusterPath struct { + ClusterPath + // RelativeReplicaPath relative path to the repository on the disk. + RelativeReplicaPath string +} + +// NewRepositoryClusterPath initializes and returns RepositoryClusterPath. +func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath { + return RepositoryClusterPath{ + ClusterPath: ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativeReplicaPath: relativePath, + } +} + +// ClusterPath represents path on the cluster to the storage. +type ClusterPath struct { + // VirtualStorage is the name of the virtual storage. + VirtualStorage string + // Storage is the name of the gitaly storage. + Storage string +} + +// NewStorageCleanup initialises and returns a new instance of the StorageCleanup. +func NewStorageCleanup(db *sql.DB) *StorageCleanup { + return &StorageCleanup{db: db} +} + +// StorageCleanup provides methods on the database for the repository cleanup operation. +type StorageCleanup struct { + db *sql.DB +} + +// Populate adds storage to the set, so it can be acquired afterwards. +func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage string) error { + if _, err := ss.db.ExecContext( + ctx, + ` + INSERT INTO storage_cleanups (virtual_storage, storage) VALUES ($1, $2) + ON CONFLICT (virtual_storage, storage) DO NOTHING`, + virtualStorage, storage, + ); err != nil { + return fmt.Errorf("exec: %w", err) + } + return nil +} + +// AcquireNextStorage picks up the next storage for processing. +// Once acquired no other call to the same method will return the same storage, so it +// works as exclusive lock on that entry. +// Once processing is done the returned function needs to be called to release +// acquired storage. It updates last_run column of the entry on execution. +func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) { + var entry ClusterPath + if err := ss.db.QueryRowContext( + ctx, + `UPDATE storage_cleanups + SET triggered_at = (NOW() AT TIME ZONE 'UTC') + WHERE (virtual_storage, storage) IN ( + SELECT virtual_storage, storage + FROM storage_cleanups + WHERE + COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1) + AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2) + ORDER BY last_run NULLS FIRST, virtual_storage, storage + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING virtual_storage, storage`, + inactive.Milliseconds(), updatePeriod.Milliseconds(), + ).Scan(&entry.VirtualStorage, &entry.Storage); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return nil, nil, fmt.Errorf("scan: %w", err) + } + return nil, func() error { return nil }, nil + } + + stop := make(chan struct{}, 1) + stopped := make(chan struct{}) + go func() { + trigger := helper.NewTimerTicker(updatePeriod - 100*time.Millisecond) + defer func() { + trigger.Stop() + close(stopped) + }() + + for { + trigger.Reset() + select { + case <-ctx.Done(): + return + case <-stop: + return + case <-trigger.C(): + if _, err := ss.db.ExecContext( + ctx, + `UPDATE storage_cleanups + SET triggered_at = NOW() + WHERE virtual_storage = $1 AND storage = $2`, + entry.VirtualStorage, entry.Storage, + ); err != nil { + return + } + } + } + }() + + return &entry, func() error { + // signals health update goroutine to terminate + stop <- struct{}{} + // waits for the health update goroutine to terminate to prevent update + // of the triggered_at after setting it to NULL + <-stopped + + if _, err := ss.db.ExecContext( + ctx, + `UPDATE storage_cleanups + SET last_run = NOW(), triggered_at = NULL + WHERE virtual_storage = $1 AND storage = $2`, + entry.VirtualStorage, entry.Storage, + ); err != nil { + return fmt.Errorf("update storage_cleanups: %w", err) + } + return nil + }, nil +} + +// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database +// by querying repositories and storage_repositories tables. +func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]RepositoryClusterPath, error) { + if len(replicaPaths) == 0 { + return nil, nil + } + + rows, err := ss.db.QueryContext( + ctx, + `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS replica_path + EXCEPT ( + SELECT virtual_storage, storage, replica_path + FROM repositories + JOIN storage_repositories USING (virtual_storage, relative_path) + WHERE virtual_storage = $1 AND storage = $2 AND replica_path = ANY($3) + )`, + virtualStorage, storage, pq.StringArray(replicaPaths), + ) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer func() { _ = rows.Close() }() + + var res []RepositoryClusterPath + for rows.Next() { + var curr RepositoryClusterPath + if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativeReplicaPath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + res = append(res, curr) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("loop: %w", err) + } + if err := rows.Close(); err != nil { + return nil, fmt.Errorf("close: %w", err) + } + return res, nil +} diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go new file mode 100644 index 000000000..bb8e66d75 --- /dev/null +++ b/internal/praefect/datastore/storage_cleanup_test.go @@ -0,0 +1,283 @@ +package datastore + +import ( + "database/sql" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestStorageCleanup_Populate(t *testing.T) { + t.Parallel() + ctx, cancel := testhelper.Context() + defer cancel() + db := glsql.NewDB(t) + storageCleanup := NewStorageCleanup(db.DB) + + require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1")) + actual := getAllStoragesCleanup(t, db) + single := []storageCleanupRow{{ClusterPath: ClusterPath{VirtualStorage: "praefect", Storage: "gitaly-1"}}} + require.Equal(t, single, actual) + + err := storageCleanup.Populate(ctx, "praefect", "gitaly-1") + require.NoError(t, err, "population of the same data should not generate an error") + actual = getAllStoragesCleanup(t, db) + require.Equal(t, single, actual, "same data should not create additional rows or change existing") + + require.NoError(t, storageCleanup.Populate(ctx, "default", "gitaly-2")) + multiple := append(single, storageCleanupRow{ClusterPath: ClusterPath{VirtualStorage: "default", Storage: "gitaly-2"}}) + actual = getAllStoragesCleanup(t, db) + require.ElementsMatch(t, multiple, actual, "new data should create additional row") +} + +func TestStorageCleanup_AcquireNextStorage(t *testing.T) { + t.Parallel() + ctx, cancel := testhelper.Context() + defer cancel() + db := glsql.NewDB(t) + storageCleanup := NewStorageCleanup(db.DB) + + t.Run("ok", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + + clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath) + }) + + t.Run("last_run condition", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + // Acquire it to initialize last_run column. + _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + + clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, time.Hour, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Nil(t, clusterPath, "no result expected as there can't be such entries") + }) + + t.Run("sorting based on storage name as no executions done yet", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g3")) + + clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath) + }) + + t.Run("sorting based on storage name and last_run", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) + + clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath) + }) + + t.Run("sorting based on last_run", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2")) + clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath) + clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath) + + clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NoError(t, release()) + require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath) + }) + + t.Run("already acquired won't be acquired until released", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + + clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.Nil(t, clusterPath, clusterPath) + require.NoError(t, release1()) + require.NoError(t, release2()) + + clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NotNil(t, clusterPath) + require.NoError(t, release3()) + }) + + t.Run("already acquired won't be acquired until released", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + + clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.Nil(t, clusterPath, clusterPath) + require.NoError(t, release1()) + require.NoError(t, release2()) + + clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second) + require.NoError(t, err) + require.NotNil(t, clusterPath) + require.NoError(t, release3()) + }) + + t.Run("acquired for long time triggers update loop", func(t *testing.T) { + db.TruncateAll(t) + require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1")) + start := time.Now().UTC() + _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 500*time.Millisecond) + require.NoError(t, err) + + // Make sure the triggered_at column has a non NULL value after the record is acquired. + check1 := getAllStoragesCleanup(t, db) + require.Len(t, check1, 1) + require.True(t, check1[0].TriggeredAt.Valid) + require.True(t, check1[0].TriggeredAt.Time.After(start), check1[0].TriggeredAt.Time.String(), start.String()) + + // Check the goroutine running in the background updates triggered_at column periodically. + time.Sleep(time.Second) + + check2 := getAllStoragesCleanup(t, db) + require.Len(t, check2, 1) + require.True(t, check2[0].TriggeredAt.Valid) + require.True(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), check2[0].TriggeredAt.Time.String(), check1[0].TriggeredAt.Time.String()) + + require.NoError(t, release()) + + // Make sure the triggered_at column has a NULL value after the record is released. + check3 := getAllStoragesCleanup(t, db) + require.Len(t, check3, 1) + require.False(t, check3[0].TriggeredAt.Valid) + }) +} + +func TestStorageCleanup_Exists(t *testing.T) { + t.Parallel() + ctx, cancel := testhelper.Context() + defer cancel() + + db := glsql.NewDB(t) + + repoStore := NewPostgresRepositoryStore(db.DB, nil) + require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false)) + require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false)) + storageCleanup := NewStorageCleanup(db.DB) + + for _, tc := range []struct { + desc string + virtualStorage string + storage string + relativeReplicaPaths []string + out []RepositoryClusterPath + }{ + { + desc: "multiple doesn't exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"}, + out: []RepositoryClusterPath{ + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, + }, + }, + { + desc: "duplicates", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "path/x", "path/x"}, + out: []RepositoryClusterPath{ + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + }, + }, + { + desc: "all exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"p/1", "p/2"}, + out: nil, + }, + { + desc: "all doesn't exist", + virtualStorage: "vs", + storage: "g1", + relativeReplicaPaths: []string{"path/x", "path/y", "path/z"}, + out: []RepositoryClusterPath{ + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"}, + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/z"}, + }, + }, + { + desc: "doesn't exist because of storage", + virtualStorage: "vs", + storage: "stub", + relativeReplicaPaths: []string{"path/x"}, + out: []RepositoryClusterPath{ + {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "stub"}, RelativeReplicaPath: "path/x"}, + }, + }, + { + desc: "doesn't exist because of virtual storage", + virtualStorage: "stub", + storage: "g1", + relativeReplicaPaths: []string{"path/x"}, + out: []RepositoryClusterPath{ + {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: "g1"}, RelativeReplicaPath: "path/x"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + res, err := storageCleanup.DoesntExist(ctx, tc.virtualStorage, tc.storage, tc.relativeReplicaPaths) + require.NoError(t, err) + require.ElementsMatch(t, tc.out, res) + }) + } +} + +type storageCleanupRow struct { + ClusterPath + LastRun sql.NullTime + TriggeredAt sql.NullTime +} + +func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow { + rows, err := db.Query(`SELECT * FROM storage_cleanups`) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + var res []storageCleanupRow + for rows.Next() { + var dst storageCleanupRow + err := rows.Scan(&dst.VirtualStorage, &dst.Storage, &dst.LastRun, &dst.TriggeredAt) + require.NoError(t, err) + res = append(res, dst) + } + require.NoError(t, rows.Err()) + return res +} diff --git a/internal/praefect/repocleaner/action_log.go b/internal/praefect/repocleaner/action_log.go new file mode 100644 index 000000000..934d69c49 --- /dev/null +++ b/internal/praefect/repocleaner/action_log.go @@ -0,0 +1,33 @@ +package repocleaner + +import ( + "context" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" +) + +// LogWarnAction is an implementation of the Action interface that allows to log a warning message +// for the repositories that are not known for the praefect. +type LogWarnAction struct { + logger logrus.FieldLogger +} + +// NewLogWarnAction return new instance of the LogWarnAction. +func NewLogWarnAction(logger logrus.FieldLogger) *LogWarnAction { + return &LogWarnAction{ + logger: logger.WithField("component", "repocleaner.log_warn_action"), + } +} + +// Perform logs a warning for each repository that is not known to praefect. +func (al LogWarnAction) Perform(_ context.Context, notExisting []datastore.RepositoryClusterPath) error { + for _, entry := range notExisting { + al.logger.WithFields(logrus.Fields{ + "virtual_storage": entry.VirtualStorage, + "storage": entry.Storage, + "relative_replica_path": entry.RelativeReplicaPath, + }).Warn("repository is not managed by praefect") + } + return nil +} diff --git a/internal/praefect/repocleaner/action_log_test.go b/internal/praefect/repocleaner/action_log_test.go new file mode 100644 index 000000000..1c7af50b2 --- /dev/null +++ b/internal/praefect/repocleaner/action_log_test.go @@ -0,0 +1,48 @@ +package repocleaner + +import ( + "context" + "testing" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" +) + +func TestLogWarnAction_Perform(t *testing.T) { + logger, hook := test.NewNullLogger() + action := NewLogWarnAction(logger) + err := action.Perform(context.TODO(), []datastore.RepositoryClusterPath{ + {ClusterPath: datastore.ClusterPath{VirtualStorage: "vs1", Storage: "g1"}, RelativeReplicaPath: "p/1"}, + {ClusterPath: datastore.ClusterPath{VirtualStorage: "vs2", Storage: "g2"}, RelativeReplicaPath: "p/2"}, + }) + require.NoError(t, err) + require.Len(t, hook.AllEntries(), 2) + + exp := []map[string]interface{}{{ + "Data": logrus.Fields{ + "component": "repocleaner.log_warn_action", + "virtual_storage": "vs1", + "storage": "g1", + "relative_replica_path": "p/1", + }, + "Message": "repository is not managed by praefect", + }, { + "Data": logrus.Fields{ + "component": "repocleaner.log_warn_action", + "virtual_storage": "vs2", + "storage": "g2", + "relative_replica_path": "p/2", + }, + "Message": "repository is not managed by praefect", + }} + + require.ElementsMatch(t, exp, []map[string]interface{}{{ + "Data": hook.AllEntries()[0].Data, + "Message": hook.AllEntries()[0].Message, + }, { + "Data": hook.AllEntries()[1].Data, + "Message": hook.AllEntries()[1].Message, + }}) +} diff --git a/internal/praefect/repocleaner/init_test.go b/internal/praefect/repocleaner/init_test.go new file mode 100644 index 000000000..011d4f2a4 --- /dev/null +++ b/internal/praefect/repocleaner/init_test.go @@ -0,0 +1,19 @@ +package repocleaner + +import ( + "os" + "testing" + + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" +) + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) (code int) { + defer testhelper.MustHaveNoChildProcess() + cleanup := testhelper.Configure() + defer cleanup() + return m.Run() +} diff --git a/internal/praefect/repocleaner/repository.go b/internal/praefect/repocleaner/repository.go new file mode 100644 index 000000000..fbf3b841b --- /dev/null +++ b/internal/praefect/repocleaner/repository.go @@ -0,0 +1,202 @@ +package repocleaner + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +// StateOwner performs check for the existence of the repositories. +type StateOwner interface { + // DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database + // by querying repositories and storage_repositories tables. + DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]datastore.RepositoryClusterPath, error) +} + +// Acquirer acquires storage for processing and no any other Acquirer can acquire it again until it is released. +type Acquirer interface { + // Populate adds provided storage into the pool of entries to acquire. + Populate(ctx context.Context, virtualStorage, storage string) error + // AcquireNextStorage acquires next storage based on the inactive time. + AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*datastore.ClusterPath, func() error, error) +} + +// Action is a procedure to be executed on the repositories that doesn't exist in praefect database. +type Action interface { + // Perform runs actual action for non-existing repositories. + Perform(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error +} + +// Runner scans healthy gitaly nodes for the repositories, verifies if +// found repositories are known by praefect and runs a special action. +type Runner struct { + cfg Cfg + logger logrus.FieldLogger + healthChecker praefect.HealthChecker + conns praefect.Connections + stateOwner StateOwner + acquirer Acquirer + action Action +} + +// Cfg contains set of configuration parameters to run Runner. +type Cfg struct { + // RunInterval: the check runs if the previous operation was done at least RunInterval before. + RunInterval time.Duration + // LivenessInterval: an update runs on the locked entity with provided period to signal that entity is in use. + LivenessInterval time.Duration + // RepositoriesInBatch is the number of repositories to pass as a batch for processing. + RepositoriesInBatch int +} + +// NewRunner returns instance of the Runner. +func NewRunner(cfg Cfg, logger logrus.FieldLogger, healthChecker praefect.HealthChecker, conns praefect.Connections, stateOwner StateOwner, acquirer Acquirer, action Action) *Runner { + return &Runner{ + cfg: cfg, + logger: logger.WithField("component", "repocleaner.repository_existence"), + healthChecker: healthChecker, + conns: conns, + stateOwner: stateOwner, + acquirer: acquirer, + action: action, + } +} + +// Run scans healthy gitaly nodes for the repositories, verifies if +// found repositories are known by praefect and runs a special action. +// It runs on each tick of the provided ticker and finishes with context cancellation. +func (gs *Runner) Run(ctx context.Context, ticker helper.Ticker) error { + gs.logger.Info("started") + defer gs.logger.Info("completed") + + defer ticker.Stop() + + for virtualStorage, connByStorage := range gs.conns { + for storage := range connByStorage { + if err := gs.acquirer.Populate(ctx, virtualStorage, storage); err != nil { + return fmt.Errorf("populate database: %w", err) + } + } + } + + var tick helper.Ticker + for { + // We use a local tick variable to run the first cycle + // without wait. All the other iterations are waiting + // for the next tick or context cancellation. + if tick != nil { + tick.Reset() + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C(): + } + } else { + tick = ticker + } + + gs.run(ctx) + } +} + +func (gs *Runner) run(ctx context.Context) { + clusterPath, release, err := gs.acquirer.AcquireNextStorage(ctx, gs.cfg.RunInterval, gs.cfg.LivenessInterval) + if err != nil { + gs.logger.WithError(err).Error("unable to acquire next storage to verify") + return + } + + logger := gs.logger + defer func() { + if err := release(); err != nil { + logger.WithError(err).Error("failed to release storage acquired to verify") + } + }() + + if clusterPath == nil { + gs.logger.Debug("no storages to verify") + return + } + + logger = gs.loggerWith(clusterPath.VirtualStorage, clusterPath.Storage) + err = gs.execOnRepositories(ctx, clusterPath.VirtualStorage, clusterPath.Storage, func(paths []datastore.RepositoryClusterPath) { + relativePaths := make([]string, len(paths)) + for i, path := range paths { + relativePaths[i] = path.RelativeReplicaPath + } + notExisting, err := gs.stateOwner.DoesntExist(ctx, clusterPath.VirtualStorage, clusterPath.Storage, relativePaths) + if err != nil { + logger.WithError(err).WithField("repositories", paths).Error("failed to check existence") + return + } + + if err := gs.action.Perform(ctx, notExisting); err != nil { + logger.WithError(err).WithField("existence", notExisting).Error("perform action") + return + } + }) + if err != nil { + logger.WithError(err).Error("failed to exec action on repositories") + return + } +} + +func (gs *Runner) loggerWith(virtualStorage, storage string) logrus.FieldLogger { + return gs.logger.WithFields(logrus.Fields{"virtual_storage": virtualStorage, "storage": storage}) +} + +func (gs *Runner) execOnRepositories(ctx context.Context, virtualStorage, storage string, action func([]datastore.RepositoryClusterPath)) error { + gclient, err := gs.getInternalGitalyClient(virtualStorage, storage) + if err != nil { + return fmt.Errorf("setup gitaly client: %w", err) + } + + resp, err := gclient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: storage}) + if err != nil { + return fmt.Errorf("unable to walk repos: %w", err) + } + + batch := make([]datastore.RepositoryClusterPath, 0, gs.cfg.RepositoriesInBatch) + for { + res, err := resp.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("failure on walking repos: %w", err) + } + break + } + + batch = append(batch, datastore.RepositoryClusterPath{ + ClusterPath: datastore.ClusterPath{ + VirtualStorage: virtualStorage, + Storage: storage, + }, + RelativeReplicaPath: res.RelativePath, + }) + + if len(batch) == cap(batch) { + action(batch) + batch = batch[:0] + } + } + if len(batch) > 0 { + action(batch) + } + return nil +} + +func (gs *Runner) getInternalGitalyClient(virtualStorage, storage string) (gitalypb.InternalGitalyClient, error) { + conn, found := gs.conns[virtualStorage][storage] + if !found { + return nil, fmt.Errorf("no connection to the gitaly node %q/%q", virtualStorage, storage) + } + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go new file mode 100644 index 000000000..32e7d7ad5 --- /dev/null +++ b/internal/praefect/repocleaner/repository_test.go @@ -0,0 +1,324 @@ +package repocleaner + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" +) + +func TestRunner_Run(t *testing.T) { + t.Parallel() + + const ( + repo1RelPath = "repo-1.git" + repo2RelPath = "repo-2.git" + repo3RelPath = "repo-3.git" + + storage1 = "gitaly-1" + storage2 = "gitaly-2" + storage3 = "gitaly-3" + + virtualStorage = "praefect" + ) + + g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1)) + g2Cfg := testcfg.Build(t, testcfg.WithStorages(storage2)) + g3Cfg := testcfg.Build(t, testcfg.WithStorages(storage3)) + + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g2Addr := testserver.RunGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + g3Addr := testserver.RunGitalyServer(t, g3Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := glsql.NewDB(t) + var database string + require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) + dbConf := glsql.GetDBConfig(t, database) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + {Storage: g2Cfg.Storages[0].Name, Address: g2Addr}, + {Storage: g3Cfg.Storages[0].Name, Address: g3Addr}, + }, + }, + }, + DB: dbConf, + } + cfg := Cfg{ + RunInterval: time.Duration(1), + LivenessInterval: time.Duration(1), + RepositoriesInBatch: 2, + } + + gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath}) + gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath}) + + // second gitaly is missing repo-3.git repository + gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + gittest.CloneRepo(t, g2Cfg, g2Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath}) + + // third gitaly has an extra repo-4.git repository + gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo2RelPath}) + gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo3RelPath}) + gittest.CloneRepo(t, g3Cfg, g3Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: "repo-4.git"}) + + ctx, cancel := testhelper.Context() + defer cancel() + + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) + for i, set := range []struct { + relativePath string + primary string + secondaries []string + }{ + { + relativePath: repo1RelPath, + primary: storage1, + secondaries: []string{storage3}, + }, + { + relativePath: repo2RelPath, + primary: storage1, + secondaries: []string{storage2, storage3}, + }, + { + relativePath: repo3RelPath, + primary: storage1, + secondaries: []string{storage2, storage3}, + }, + } { + require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, set.secondaries, nil, false, false)) + } + + logger, loggerHook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + entry := logger.WithContext(ctx) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil))) + nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker) + require.NoError(t, err) + defer nodeSet.Close() + + storageCleanup := datastore.NewStorageCleanup(db.DB) + + var iteration int32 + runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1, storage2, storage3}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{ + PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error { + i := atomic.LoadInt32(&iteration) + switch i { + case 0: + assert.ElementsMatch(t, nil, notExisting) + case 1: + assert.ElementsMatch(t, nil, notExisting) + case 2: + assert.ElementsMatch( + t, + []datastore.RepositoryClusterPath{ + datastore.NewRepositoryClusterPath(virtualStorage, storage2, repo1RelPath), + }, + notExisting, + ) + case 3: + assert.ElementsMatch(t, nil, notExisting) + case 4: + assert.Equal( + t, + []datastore.RepositoryClusterPath{ + datastore.NewRepositoryClusterPath(virtualStorage, storage3, "repo-4.git"), + }, + notExisting, + ) + } + atomic.AddInt32(&iteration, 1) + return nil + }, + }) + + ticker := helper.NewManualTicker() + done := make(chan struct{}) + go func() { + defer close(done) + assert.Equal(t, context.Canceled, runner.Run(ctx, ticker)) + }() + // we need to trigger it 5 times to make sure the 4-th run is fully completed + for i := 0; i < 5; i++ { + ticker.Tick() + } + require.Greater(t, atomic.LoadInt32(&iteration), int32(4)) + require.Len(t, loggerHook.AllEntries(), 1) + require.Equal( + t, + map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "started"}, + map[string]interface{}{"Data": loggerHook.AllEntries()[0].Data, "Message": loggerHook.AllEntries()[0].Message}, + ) + // Terminates the loop. + cancel() + <-done + require.Equal( + t, + map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "completed"}, + map[string]interface{}{"Data": loggerHook.LastEntry().Data, "Message": loggerHook.LastEntry().Message}, + ) +} + +func TestRunner_Run_noAvailableStorages(t *testing.T) { + t.Parallel() + + const ( + repo1RelPath = "repo-1.git" + storage1 = "gitaly-1" + virtualStorage = "praefect" + ) + + g1Cfg := testcfg.Build(t, testcfg.WithStorages(storage1)) + g1Addr := testserver.RunGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + db := glsql.NewDB(t) + var database string + require.NoError(t, db.QueryRow(`SELECT current_database()`).Scan(&database)) + dbConf := glsql.GetDBConfig(t, database) + + conf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: g1Cfg.Storages[0].Name, Address: g1Addr}, + }, + }, + }, + DB: dbConf, + } + cfg := Cfg{ + RunInterval: time.Minute, + LivenessInterval: time.Second, + RepositoriesInBatch: 2, + } + + gittest.CloneRepo(t, g1Cfg, g1Cfg.Storages[0], gittest.CloneRepoOpts{RelativePath: repo1RelPath}) + + ctx, cancel := testhelper.Context() + defer cancel() + + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) + for i, set := range []struct { + relativePath string + primary string + }{ + { + relativePath: repo1RelPath, + primary: storage1, + }, + } { + require.NoError(t, repoStore.CreateRepository(ctx, int64(i), conf.VirtualStorages[0].Name, set.relativePath, set.primary, nil, nil, false, false)) + } + + logger := testhelper.NewTestLogger(t) + entry := logger.WithContext(ctx) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil))) + nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker) + require.NoError(t, err) + defer nodeSet.Close() + + storageCleanup := datastore.NewStorageCleanup(db.DB) + startSecond := make(chan struct{}) + releaseFirst := make(chan struct{}) + runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{ + PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error { + assert.Empty(t, notExisting) + // Block execution here until send instance completes its execution. + // It allows us to be sure the picked storage can't be picked once again by + // another instance as well as that it works without problems if there is + // nothing to pick up to process. + close(startSecond) + <-releaseFirst + return nil + }, + }) + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + logger, loggerHook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + runner := NewRunner(cfg, logger, praefect.StaticHealthChecker{virtualStorage: []string{storage1}}, nodeSet.Connections(), storageCleanup, storageCleanup, actionStub{ + PerformMethod: func(ctx context.Context, notExisting []datastore.RepositoryClusterPath) error { + assert.FailNow(t, "should not be triggered as there is no available storages to acquire") + return nil + }, + }) + + ctx, cancel := testhelper.Context() + defer cancel() + ticker := helper.NewManualTicker() + + <-startSecond + go func() { + defer wg.Done() + assert.Equal(t, context.Canceled, runner.Run(ctx, ticker)) + }() + ticker.Tick() + ticker.Tick() + close(releaseFirst) + cancel() + wg.Wait() + + entries := loggerHook.AllEntries() + require.Greater(t, len(entries), 2) + require.Equal( + t, + map[string]interface{}{"Data": logrus.Fields{"component": "repocleaner.repository_existence"}, "Message": "no storages to verify"}, + map[string]interface{}{"Data": loggerHook.AllEntries()[1].Data, "Message": loggerHook.AllEntries()[1].Message}, + ) + }() + + ticker := helper.NewManualTicker() + go func() { + defer wg.Done() + assert.Equal(t, context.Canceled, runner.Run(ctx, ticker)) + }() + ticker.Tick() + ticker.Tick() // blocks until first processing cycle is done + cancel() + wg.Wait() +} + +type actionStub struct { + PerformMethod func(ctx context.Context, existence []datastore.RepositoryClusterPath) error +} + +func (as actionStub) Perform(ctx context.Context, existence []datastore.RepositoryClusterPath) error { + if as.PerformMethod != nil { + return as.PerformMethod(ctx, existence) + } + return nil +} |