diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-07-09 11:07:15 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-07-09 11:07:15 +0300 |
commit | 9c8b281cd5b8c2b7406f2b1c8db908d33a817f9f (patch) | |
tree | 3b55ec16a59cc1685f89c97a94589269fe688e8a | |
parent | fef798978197809e268e5395838b4f4eeb4288e9 (diff) | |
parent | 7808a323e6cd39976fc3502992421524b848ead4 (diff) |
Merge branch 'smh-transactional-delete' into 'master'
Delete records of all replicas that were deleted transactionally
See merge request gitlab-org/gitaly!3654
-rw-r--r-- | internal/praefect/coordinator.go | 4 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 33 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 56 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 6 |
6 files changed, 69 insertions, 38 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 3ae9863d0..acc926769 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -971,8 +971,8 @@ func (c *Coordinator) newRequestFinalizer( // If this fails, the repository was already deleted from the primary but we end up still having a record of it in the db. // Ideally we would delete the record from the db first and schedule the repository for deletion later in order to avoid // this problem. Client can reattempt this as deleting a repository is idempotent. - if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil { - if !errors.Is(err, datastore.RepositoryNotExistsError{}) { + if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), append(updatedSecondaries, primary)); err != nil { + if !errors.Is(err, datastore.ErrNoRowsAffected) { return fmt.Errorf("delete repository: %w", err) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 9dc138f74..567029378 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1921,7 +1921,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { requireSuppressedCancellation(t, ctx) return err }, - DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error { + DeleteRepositoryFunc: func(ctx context.Context, _, _ string, _ []string) error { requireSuppressedCancellation(t, ctx) return err }, diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index e840bb9d6..7a1702b6d 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -77,6 +77,9 @@ func (err RepositoryExistsError) Error() string { ) } +// ErrNoRowsAffected is returned when a query did not perform any changes. +var ErrNoRowsAffected = errors.New("no rows were affected by the query") + // RepositoryStore provides access to repository state. type RepositoryStore interface { // GetGeneration gets the repository's generation on a given storage. @@ -100,10 +103,10 @@ type RepositoryStore interface { // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error - // DeleteRepository deletes the repository from the virtual storage and the storage. Returns - // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage - // or the storage. - DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns + // ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage + // or the storages. + DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well @@ -355,7 +358,7 @@ FROM ( return err } -func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error { return rs.delete(ctx, ` WITH repo AS ( DELETE FROM repositories @@ -366,24 +369,24 @@ WITH repo AS ( DELETE FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2 -AND storage = $3 - `, virtualStorage, relativePath, storage, +AND storage = ANY($3::text[]) + `, virtualStorage, relativePath, storages, ) } // DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details. -func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error { return rs.delete(ctx, ` DELETE FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2 -AND storage = $3 - `, virtualStorage, relativePath, storage, +AND storage = ANY($3::text[]) + `, virtualStorage, relativePath, []string{storage}, ) } -func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath, storage string) error { - result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, storage) +func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error { + result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages)) if err != nil { return err } @@ -391,11 +394,7 @@ func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualSto if n, err := result.RowsAffected(); err != nil { return err } else if n == 0 { - return RepositoryNotExistsError{ - virtualStorage: virtualStorage, - relativePath: relativePath, - storage: storage, - } + return ErrNoRowsAffected } return nil diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 015648c5b..3c5b5f81c 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -10,7 +10,7 @@ type MockRepositoryStore struct { GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error - DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) @@ -61,12 +61,12 @@ func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorag return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) } -func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error { if m.DeleteRepositoryFunc == nil { return nil } - return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storage) + return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storages) } // DeleteReplica runs the mock's DeleteReplicaFunc. diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index c2922b3d6..57e61dac0 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -410,10 +410,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete non-existing", func(t *testing.T) { rs, _ := newStore(t, nil) - require.Equal(t, - RepositoryNotExistsError{vs, repo, stor}, - rs.DeleteRepository(ctx, vs, repo, stor), - ) + require.Equal(t, ErrNoRowsAffected, rs.DeleteRepository(ctx, vs, repo, []string{stor})) }) t.Run("delete existing", func(t *testing.T) { @@ -463,9 +460,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) - require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", "deleted")) - require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage")) - require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage")) + require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", []string{"deleted"})) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", []string{"deleted-storage"})) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", []string{"deleted-storage"})) requireState(t, ctx, virtualStorageState{ @@ -487,16 +484,49 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) }) + + t.Run("transactional delete", func(t *testing.T) { + rs, requireState := newStore(t, nil) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-2", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-3", 0)) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": repositoryRecord{}, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "replica-1": 0, + "replica-2": 0, + "replica-3": 0, + }, + }, + }, + ) + + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "repository-1", []string{"replica-1", "replica-2"})) + requireState(t, ctx, + virtualStorageState{}, + storageState{ + "virtual-storage-1": { + "repository-1": { + "replica-3": 0, + }, + }, + }, + ) + }) }) t.Run("DeleteReplica", func(t *testing.T) { rs, requireState := newStore(t, nil) t.Run("delete non-existing", func(t *testing.T) { - require.Equal(t, - RepositoryNotExistsError{"virtual-storage-1", "relative-path-1", "storage-1"}, - rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1"), - ) + require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1")) }) t.Run("delete existing", func(t *testing.T) { @@ -700,7 +730,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("returns not found for deleted repositories", func(t *testing.T) { - require.NoError(t, rs.DeleteRepository(ctx, vs, repo, "primary")) + require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{"primary"})) requireState(t, ctx, virtualStorageState{}, storageState{ @@ -762,7 +792,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.True(t, exists) - require.NoError(t, rs.DeleteRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{stor})) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.False(t, exists) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index c57fd77b8..5bd48f637 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -163,7 +163,9 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica var deleteFunc func(context.Context, string, string, string) error switch event.Job.Change { case datastore.DeleteRepo: - deleteFunc = dr.rs.DeleteRepository + deleteFunc = func(ctx context.Context, virtualStorage, relativePath, storage string) error { + return dr.rs.DeleteRepository(ctx, virtualStorage, relativePath, []string{storage}) + } case datastore.DeleteReplica: deleteFunc = dr.rs.DeleteReplica default: @@ -173,7 +175,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica // If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual // storage but having one for the storage. We can later retry the deletion. if err := deleteFunc(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil { - if !errors.Is(err, datastore.RepositoryNotExistsError{}) { + if !errors.Is(err, datastore.ErrNoRowsAffected) { return err } |