Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-01 12:05:31 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-30 00:09:54 +0300
commit36bd7e0f2f90d6a3ad379e084314fdea681b7118 (patch)
tree5016546e3fbcc5740e375e80adfebdb7658b19d4 /internal
parent50974202df9317fcbd1be3cb856572830e06828f (diff)
repoclean: Introduction of the new storage_cleanups table
The problem with repositories missing in the praefect database becomes more actual as more customers migrate to the cluster setup. In order to verify state of the things in the cluster the new background job needs to be implemented. As the first step it should run over all repositories on gitaly storages and check repository existence in the praefect database. As usually there are multiple instances of the praefect in the cluster to support HA we need to coordinate scan by each praefect. That is why a new table storage_cleanups is introduced. It is used to coordinate available for scan gitaly storages. The coordination is based on the acquiring and "locking" (write to the column) of the rows, so acquired rows are not visible for other praefect instances until execution is in progress. The change also includes method to verify existence of the repositories in the praefect database. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/3719
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/datastore/glsql/testing.go1
-rw-r--r--internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go23
-rw-r--r--internal/praefect/datastore/migrations/20210927083631_repository_path_index.go19
-rw-r--r--internal/praefect/datastore/storage_cleanup.go182
-rw-r--r--internal/praefect/datastore/storage_cleanup_test.go283
5 files changed, 508 insertions, 0 deletions
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index a6c9dd19e..d47b8cd1a 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -97,6 +97,7 @@ func (db DB) TruncateAll(t testing.TB) {
"repositories",
"virtual_storages",
"repository_assignments",
+ "storage_cleanups",
)
}
diff --git a/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go
new file mode 100644
index 000000000..17d68cade
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20210914115710_storage_cleanups_table.go
@@ -0,0 +1,23 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20210914115710_storage_cleanups_table",
+ Up: []string{
+ `CREATE TABLE storage_cleanups (
+ virtual_storage TEXT NOT NULL,
+ storage TEXT NOT NULL,
+ last_run TIMESTAMP WITHOUT TIME ZONE,
+ triggered_at TIMESTAMP WITHOUT TIME ZONE,
+ PRIMARY KEY (virtual_storage, storage)
+ )`,
+ },
+ Down: []string{
+ `DROP TABLE storage_cleanups`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go
new file mode 100644
index 000000000..cc7022738
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20210927083631_repository_path_index.go
@@ -0,0 +1,19 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20210927083631_repository_path_index",
+ Up: []string{
+ `CREATE INDEX CONCURRENTLY repository_replica_path_index
+ ON repositories (replica_path, virtual_storage)`,
+ },
+ DisableTransactionUp: true,
+ Down: []string{
+ `DROP INDEX repository_replica_path_index`,
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/datastore/storage_cleanup.go b/internal/praefect/datastore/storage_cleanup.go
new file mode 100644
index 000000000..2c50f2aac
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup.go
@@ -0,0 +1,182 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+)
+
+// RepositoryClusterPath identifies location of the repository in the cluster.
+type RepositoryClusterPath struct {
+ ClusterPath
+ // RelativeReplicaPath relative path to the repository on the disk.
+ RelativeReplicaPath string
+}
+
+// NewRepositoryClusterPath initializes and returns RepositoryClusterPath.
+func NewRepositoryClusterPath(virtualStorage, storage, relativePath string) RepositoryClusterPath {
+ return RepositoryClusterPath{
+ ClusterPath: ClusterPath{
+ VirtualStorage: virtualStorage,
+ Storage: storage,
+ },
+ RelativeReplicaPath: relativePath,
+ }
+}
+
+// ClusterPath represents path on the cluster to the storage.
+type ClusterPath struct {
+ // VirtualStorage is the name of the virtual storage.
+ VirtualStorage string
+ // Storage is the name of the gitaly storage.
+ Storage string
+}
+
+// NewStorageCleanup initialises and returns a new instance of the StorageCleanup.
+func NewStorageCleanup(db *sql.DB) *StorageCleanup {
+ return &StorageCleanup{db: db}
+}
+
+// StorageCleanup provides methods on the database for the repository cleanup operation.
+type StorageCleanup struct {
+ db *sql.DB
+}
+
+// Populate adds storage to the set, so it can be acquired afterwards.
+func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage string) error {
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `
+ INSERT INTO storage_cleanups (virtual_storage, storage) VALUES ($1, $2)
+ ON CONFLICT (virtual_storage, storage) DO NOTHING`,
+ virtualStorage, storage,
+ ); err != nil {
+ return fmt.Errorf("exec: %w", err)
+ }
+ return nil
+}
+
+// AcquireNextStorage picks up the next storage for processing.
+// Once acquired no other call to the same method will return the same storage, so it
+// works as exclusive lock on that entry.
+// Once processing is done the returned function needs to be called to release
+// acquired storage. It updates last_run column of the entry on execution.
+func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error) {
+ var entry ClusterPath
+ if err := ss.db.QueryRowContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET triggered_at = (NOW() AT TIME ZONE 'UTC')
+ WHERE (virtual_storage, storage) IN (
+ SELECT virtual_storage, storage
+ FROM storage_cleanups
+ WHERE
+ COALESCE(last_run, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $1)
+ AND COALESCE(triggered_at, TO_TIMESTAMP(0)) <= (NOW() - INTERVAL '1 MILLISECOND' * $2)
+ ORDER BY last_run NULLS FIRST, virtual_storage, storage
+ LIMIT 1
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING virtual_storage, storage`,
+ inactive.Milliseconds(), updatePeriod.Milliseconds(),
+ ).Scan(&entry.VirtualStorage, &entry.Storage); err != nil {
+ if !errors.Is(err, sql.ErrNoRows) {
+ return nil, nil, fmt.Errorf("scan: %w", err)
+ }
+ return nil, func() error { return nil }, nil
+ }
+
+ stop := make(chan struct{}, 1)
+ stopped := make(chan struct{})
+ go func() {
+ trigger := helper.NewTimerTicker(updatePeriod - 100*time.Millisecond)
+ defer func() {
+ trigger.Stop()
+ close(stopped)
+ }()
+
+ for {
+ trigger.Reset()
+ select {
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
+ case <-trigger.C():
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET triggered_at = NOW()
+ WHERE virtual_storage = $1 AND storage = $2`,
+ entry.VirtualStorage, entry.Storage,
+ ); err != nil {
+ return
+ }
+ }
+ }
+ }()
+
+ return &entry, func() error {
+ // signals health update goroutine to terminate
+ stop <- struct{}{}
+ // waits for the health update goroutine to terminate to prevent update
+ // of the triggered_at after setting it to NULL
+ <-stopped
+
+ if _, err := ss.db.ExecContext(
+ ctx,
+ `UPDATE storage_cleanups
+ SET last_run = NOW(), triggered_at = NULL
+ WHERE virtual_storage = $1 AND storage = $2`,
+ entry.VirtualStorage, entry.Storage,
+ ); err != nil {
+ return fmt.Errorf("update storage_cleanups: %w", err)
+ }
+ return nil
+ }, nil
+}
+
+// DoesntExist returns RepositoryClusterPath for each repository that doesn't exist in the database
+// by querying repositories and storage_repositories tables.
+func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]RepositoryClusterPath, error) {
+ if len(replicaPaths) == 0 {
+ return nil, nil
+ }
+
+ rows, err := ss.db.QueryContext(
+ ctx,
+ `SELECT $1 AS virtual_storage, $2 AS storage, UNNEST($3::TEXT[]) AS replica_path
+ EXCEPT (
+ SELECT virtual_storage, storage, replica_path
+ FROM repositories
+ JOIN storage_repositories USING (virtual_storage, relative_path)
+ WHERE virtual_storage = $1 AND storage = $2 AND replica_path = ANY($3)
+ )`,
+ virtualStorage, storage, pq.StringArray(replicaPaths),
+ )
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+
+ var res []RepositoryClusterPath
+ for rows.Next() {
+ var curr RepositoryClusterPath
+ if err := rows.Scan(&curr.VirtualStorage, &curr.Storage, &curr.RelativeReplicaPath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+ res = append(res, curr)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("loop: %w", err)
+ }
+ if err := rows.Close(); err != nil {
+ return nil, fmt.Errorf("close: %w", err)
+ }
+ return res, nil
+}
diff --git a/internal/praefect/datastore/storage_cleanup_test.go b/internal/praefect/datastore/storage_cleanup_test.go
new file mode 100644
index 000000000..bb8e66d75
--- /dev/null
+++ b/internal/praefect/datastore/storage_cleanup_test.go
@@ -0,0 +1,283 @@
+package datastore
+
+import (
+ "database/sql"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+)
+
+func TestStorageCleanup_Populate(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ db := glsql.NewDB(t)
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ require.NoError(t, storageCleanup.Populate(ctx, "praefect", "gitaly-1"))
+ actual := getAllStoragesCleanup(t, db)
+ single := []storageCleanupRow{{ClusterPath: ClusterPath{VirtualStorage: "praefect", Storage: "gitaly-1"}}}
+ require.Equal(t, single, actual)
+
+ err := storageCleanup.Populate(ctx, "praefect", "gitaly-1")
+ require.NoError(t, err, "population of the same data should not generate an error")
+ actual = getAllStoragesCleanup(t, db)
+ require.Equal(t, single, actual, "same data should not create additional rows or change existing")
+
+ require.NoError(t, storageCleanup.Populate(ctx, "default", "gitaly-2"))
+ multiple := append(single, storageCleanupRow{ClusterPath: ClusterPath{VirtualStorage: "default", Storage: "gitaly-2"}})
+ actual = getAllStoragesCleanup(t, db)
+ require.ElementsMatch(t, multiple, actual, "new data should create additional row")
+}
+
+func TestStorageCleanup_AcquireNextStorage(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ db := glsql.NewDB(t)
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ t.Run("ok", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("last_run condition", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ // Acquire it to initialize last_run column.
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, time.Hour, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Nil(t, clusterPath, "no result expected as there can't be such entries")
+ })
+
+ t.Run("sorting based on storage name as no executions done yet", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g3"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("sorting based on storage name and last_run", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath)
+ })
+
+ t.Run("sorting based on last_run", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g2"))
+ clusterPath, release, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g2"}, clusterPath)
+
+ clusterPath, release, err = storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NoError(t, release())
+ require.Equal(t, &ClusterPath{VirtualStorage: "vs", Storage: "g1"}, clusterPath)
+ })
+
+ t.Run("already acquired won't be acquired until released", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+
+ clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.Nil(t, clusterPath, clusterPath)
+ require.NoError(t, release1())
+ require.NoError(t, release2())
+
+ clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NotNil(t, clusterPath)
+ require.NoError(t, release3())
+ })
+
+ t.Run("already acquired won't be acquired until released", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ _, release1, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+
+ clusterPath, release2, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.Nil(t, clusterPath, clusterPath)
+ require.NoError(t, release1())
+ require.NoError(t, release2())
+
+ clusterPath, release3, err := storageCleanup.AcquireNextStorage(ctx, 0, time.Second)
+ require.NoError(t, err)
+ require.NotNil(t, clusterPath)
+ require.NoError(t, release3())
+ })
+
+ t.Run("acquired for long time triggers update loop", func(t *testing.T) {
+ db.TruncateAll(t)
+ require.NoError(t, storageCleanup.Populate(ctx, "vs", "g1"))
+ start := time.Now().UTC()
+ _, release, err := storageCleanup.AcquireNextStorage(ctx, 0, 500*time.Millisecond)
+ require.NoError(t, err)
+
+ // Make sure the triggered_at column has a non NULL value after the record is acquired.
+ check1 := getAllStoragesCleanup(t, db)
+ require.Len(t, check1, 1)
+ require.True(t, check1[0].TriggeredAt.Valid)
+ require.True(t, check1[0].TriggeredAt.Time.After(start), check1[0].TriggeredAt.Time.String(), start.String())
+
+ // Check the goroutine running in the background updates triggered_at column periodically.
+ time.Sleep(time.Second)
+
+ check2 := getAllStoragesCleanup(t, db)
+ require.Len(t, check2, 1)
+ require.True(t, check2[0].TriggeredAt.Valid)
+ require.True(t, check2[0].TriggeredAt.Time.After(check1[0].TriggeredAt.Time), check2[0].TriggeredAt.Time.String(), check1[0].TriggeredAt.Time.String())
+
+ require.NoError(t, release())
+
+ // Make sure the triggered_at column has a NULL value after the record is released.
+ check3 := getAllStoragesCleanup(t, db)
+ require.Len(t, check3, 1)
+ require.False(t, check3[0].TriggeredAt.Valid)
+ })
+}
+
+func TestStorageCleanup_Exists(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := glsql.NewDB(t)
+
+ repoStore := NewPostgresRepositoryStore(db.DB, nil)
+ require.NoError(t, repoStore.CreateRepository(ctx, 0, "vs", "p/1", "g1", []string{"g2", "g3"}, nil, false, false))
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, "vs", "p/2", "g1", []string{"g2", "g3"}, nil, false, false))
+ storageCleanup := NewStorageCleanup(db.DB)
+
+ for _, tc := range []struct {
+ desc string
+ virtualStorage string
+ storage string
+ relativeReplicaPaths []string
+ out []RepositoryClusterPath
+ }{
+ {
+ desc: "multiple doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2", "path/x", "path/y"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"},
+ },
+ },
+ {
+ desc: "duplicates",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "path/x", "path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ {
+ desc: "all exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"p/1", "p/2"},
+ out: nil,
+ },
+ {
+ desc: "all doesn't exist",
+ virtualStorage: "vs",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x", "path/y", "path/z"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/y"},
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "g1"}, RelativeReplicaPath: "path/z"},
+ },
+ },
+ {
+ desc: "doesn't exist because of storage",
+ virtualStorage: "vs",
+ storage: "stub",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "vs", Storage: "stub"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ {
+ desc: "doesn't exist because of virtual storage",
+ virtualStorage: "stub",
+ storage: "g1",
+ relativeReplicaPaths: []string{"path/x"},
+ out: []RepositoryClusterPath{
+ {ClusterPath: ClusterPath{VirtualStorage: "stub", Storage: "g1"}, RelativeReplicaPath: "path/x"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ res, err := storageCleanup.DoesntExist(ctx, tc.virtualStorage, tc.storage, tc.relativeReplicaPaths)
+ require.NoError(t, err)
+ require.ElementsMatch(t, tc.out, res)
+ })
+ }
+}
+
+type storageCleanupRow struct {
+ ClusterPath
+ LastRun sql.NullTime
+ TriggeredAt sql.NullTime
+}
+
+func getAllStoragesCleanup(t testing.TB, db glsql.DB) []storageCleanupRow {
+ rows, err := db.Query(`SELECT * FROM storage_cleanups`)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, rows.Close())
+ }()
+
+ var res []storageCleanupRow
+ for rows.Next() {
+ var dst storageCleanupRow
+ err := rows.Scan(&dst.VirtualStorage, &dst.Storage, &dst.LastRun, &dst.TriggeredAt)
+ require.NoError(t, err)
+ res = append(res, dst)
+ }
+ require.NoError(t, rows.Err())
+ return res
+}