Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-07-09 11:07:15 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-07-09 11:07:15 +0300
commit9c8b281cd5b8c2b7406f2b1c8db908d33a817f9f (patch)
tree3b55ec16a59cc1685f89c97a94589269fe688e8a
parentfef798978197809e268e5395838b4f4eeb4288e9 (diff)
parent7808a323e6cd39976fc3502992421524b848ead4 (diff)
Merge branch 'smh-transactional-delete' into 'master'
Delete records of all replicas that were deleted transactionally See merge request gitlab-org/gitaly!3654
-rw-r--r--internal/praefect/coordinator.go4
-rw-r--r--internal/praefect/coordinator_test.go2
-rw-r--r--internal/praefect/datastore/repository_store.go33
-rw-r--r--internal/praefect/datastore/repository_store_mock.go6
-rw-r--r--internal/praefect/datastore/repository_store_test.go56
-rw-r--r--internal/praefect/replicator.go6
6 files changed, 69 insertions, 38 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 3ae9863d0..acc926769 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -971,8 +971,8 @@ func (c *Coordinator) newRequestFinalizer(
// If this fails, the repository was already deleted from the primary but we end up still having a record of it in the db.
// Ideally we would delete the record from the db first and schedule the repository for deletion later in order to avoid
// this problem. Client can reattempt this as deleting a repository is idempotent.
- if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil {
- if !errors.Is(err, datastore.RepositoryNotExistsError{}) {
+ if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), append(updatedSecondaries, primary)); err != nil {
+ if !errors.Is(err, datastore.ErrNoRowsAffected) {
return fmt.Errorf("delete repository: %w", err)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 9dc138f74..567029378 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1921,7 +1921,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
requireSuppressedCancellation(t, ctx)
return err
},
- DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error {
+ DeleteRepositoryFunc: func(ctx context.Context, _, _ string, _ []string) error {
requireSuppressedCancellation(t, ctx)
return err
},
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index e840bb9d6..7a1702b6d 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -77,6 +77,9 @@ func (err RepositoryExistsError) Error() string {
)
}
+// ErrNoRowsAffected is returned when a query did not perform any changes.
+var ErrNoRowsAffected = errors.New("no rows were affected by the query")
+
// RepositoryStore provides access to repository state.
type RepositoryStore interface {
// GetGeneration gets the repository's generation on a given storage.
@@ -100,10 +103,10 @@ type RepositoryStore interface {
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
- // DeleteRepository deletes the repository from the virtual storage and the storage. Returns
- // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage
- // or the storage.
- DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
+ // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns
+ // ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage
+ // or the storages.
+ DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error
// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
// RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well
@@ -355,7 +358,7 @@ FROM (
return err
}
-func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error {
return rs.delete(ctx, `
WITH repo AS (
DELETE FROM repositories
@@ -366,24 +369,24 @@ WITH repo AS (
DELETE FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
-AND storage = $3
- `, virtualStorage, relativePath, storage,
+AND storage = ANY($3::text[])
+ `, virtualStorage, relativePath, storages,
)
}
// DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
-func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error {
return rs.delete(ctx, `
DELETE FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
-AND storage = $3
- `, virtualStorage, relativePath, storage,
+AND storage = ANY($3::text[])
+ `, virtualStorage, relativePath, []string{storage},
)
}
-func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath, storage string) error {
- result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, storage)
+func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error {
+ result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages))
if err != nil {
return err
}
@@ -391,11 +394,7 @@ func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualSto
if n, err := result.RowsAffected(); err != nil {
return err
} else if n == 0 {
- return RepositoryNotExistsError{
- virtualStorage: virtualStorage,
- relativePath: relativePath,
- storage: storage,
- }
+ return ErrNoRowsAffected
}
return nil
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 015648c5b..3c5b5f81c 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -10,7 +10,7 @@ type MockRepositoryStore struct {
GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
- DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error
DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
@@ -61,12 +61,12 @@ func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorag
return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
}
-func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error {
if m.DeleteRepositoryFunc == nil {
return nil
}
- return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storage)
+ return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storages)
}
// DeleteReplica runs the mock's DeleteReplicaFunc.
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index c2922b3d6..57e61dac0 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -410,10 +410,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("delete non-existing", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.Equal(t,
- RepositoryNotExistsError{vs, repo, stor},
- rs.DeleteRepository(ctx, vs, repo, stor),
- )
+ require.Equal(t, ErrNoRowsAffected, rs.DeleteRepository(ctx, vs, repo, []string{stor}))
})
t.Run("delete existing", func(t *testing.T) {
@@ -463,9 +460,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
- require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", "deleted"))
- require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage"))
- require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage"))
+ require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", []string{"deleted"}))
+ require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", []string{"deleted-storage"}))
+ require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", []string{"deleted-storage"}))
requireState(t, ctx,
virtualStorageState{
@@ -487,16 +484,49 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
},
)
})
+
+ t.Run("transactional delete", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-1", 0))
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-2", 0))
+ require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-3", 0))
+
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": repositoryRecord{},
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "replica-1": 0,
+ "replica-2": 0,
+ "replica-3": 0,
+ },
+ },
+ },
+ )
+
+ require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "repository-1", []string{"replica-1", "replica-2"}))
+ requireState(t, ctx,
+ virtualStorageState{},
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "replica-3": 0,
+ },
+ },
+ },
+ )
+ })
})
t.Run("DeleteReplica", func(t *testing.T) {
rs, requireState := newStore(t, nil)
t.Run("delete non-existing", func(t *testing.T) {
- require.Equal(t,
- RepositoryNotExistsError{"virtual-storage-1", "relative-path-1", "storage-1"},
- rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1"),
- )
+ require.Equal(t, ErrNoRowsAffected, rs.DeleteReplica(ctx, "virtual-storage-1", "relative-path-1", "storage-1"))
})
t.Run("delete existing", func(t *testing.T) {
@@ -700,7 +730,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
t.Run("returns not found for deleted repositories", func(t *testing.T) {
- require.NoError(t, rs.DeleteRepository(ctx, vs, repo, "primary"))
+ require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{"primary"}))
requireState(t, ctx,
virtualStorageState{},
storageState{
@@ -762,7 +792,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
require.NoError(t, err)
require.True(t, exists)
- require.NoError(t, rs.DeleteRepository(ctx, vs, repo, stor))
+ require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{stor}))
exists, err = rs.RepositoryExists(ctx, vs, repo)
require.NoError(t, err)
require.False(t, exists)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index c57fd77b8..5bd48f637 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -163,7 +163,9 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
var deleteFunc func(context.Context, string, string, string) error
switch event.Job.Change {
case datastore.DeleteRepo:
- deleteFunc = dr.rs.DeleteRepository
+ deleteFunc = func(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ return dr.rs.DeleteRepository(ctx, virtualStorage, relativePath, []string{storage})
+ }
case datastore.DeleteReplica:
deleteFunc = dr.rs.DeleteReplica
default:
@@ -173,7 +175,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
// If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual
// storage but having one for the storage. We can later retry the deletion.
if err := deleteFunc(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.TargetNodeStorage); err != nil {
- if !errors.Is(err, datastore.RepositoryNotExistsError{}) {
+ if !errors.Is(err, datastore.ErrNoRowsAffected) {
return err
}