diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-08 11:47:23 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-08 12:27:10 +0300 |
commit | 7808a323e6cd39976fc3502992421524b848ead4 (patch) | |
tree | d56aad95fe609a5f13d64bd8f4bcfdbf28b2cb22 | |
parent | c824e3e73d0e916a3605da38a492e4ed30c2a430 (diff) |
Delete records of all replicas that were deleted transactionally
We recently added transaction support for repository deletions to alleviate
races that occur when quickly deleting a repository and creating it again.
As the deletes were not transactional, a deletion job to a secondary would be
scheduled and could be applied to the newly created repository if it is created
again on the secondary that is pending the deletion job. While most of the other
parts of making deletes transactional are in place, we don't currently delete
the database records of all of the replicas that were deleted transactionally,
only the primary's. This then leaves some stale state in the database that can
affect the newly created repository. This commit fixes this by deleting the
database records of all replicas that participated in the delete transaction.
-rw-r--r-- | internal/praefect/coordinator.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 22 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 48 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 4 |
6 files changed, 61 insertions, 23 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index aad66ce63..acc926769 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -971,7 +971,7 @@ func (c *Coordinator) newRequestFinalizer( // If this fails, the repository was already deleted from the primary but we end up still having a record of it in the db. // Ideally we would delete the record from the db first and schedule the repository for deletion later in order to avoid // this problem. Client can reattempt this as deleting a repository is idempotent. - if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil { + if err := c.rs.DeleteRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), append(updatedSecondaries, primary)); err != nil { if !errors.Is(err, datastore.ErrNoRowsAffected) { return fmt.Errorf("delete repository: %w", err) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 9dc138f74..567029378 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1921,7 +1921,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { requireSuppressedCancellation(t, ctx) return err }, - DeleteRepositoryFunc: func(ctx context.Context, _, _, _ string) error { + DeleteRepositoryFunc: func(ctx context.Context, _, _ string, _ []string) error { requireSuppressedCancellation(t, ctx) return err }, diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 7dc5bf6d9..7a1702b6d 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -103,10 +103,10 @@ type RepositoryStore interface { // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error - // DeleteRepository deletes the repository from the virtual storage and the storage. Returns + // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns // ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage - // or the storage. - DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + // or the storages. + DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well @@ -358,7 +358,7 @@ FROM ( return err } -func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error { return rs.delete(ctx, ` WITH repo AS ( DELETE FROM repositories @@ -369,24 +369,24 @@ WITH repo AS ( DELETE FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2 -AND storage = $3 - `, virtualStorage, relativePath, storage, +AND storage = ANY($3::text[]) + `, virtualStorage, relativePath, storages, ) } // DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details. -func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error { return rs.delete(ctx, ` DELETE FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2 -AND storage = $3 - `, virtualStorage, relativePath, storage, +AND storage = ANY($3::text[]) + `, virtualStorage, relativePath, []string{storage}, ) } -func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath, storage string) error { - result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, storage) +func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error { + result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages)) if err != nil { return err } diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 015648c5b..3c5b5f81c 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -10,7 +10,7 @@ type MockRepositoryStore struct { GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error - DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) @@ -61,12 +61,12 @@ func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorag return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) } -func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error { if m.DeleteRepositoryFunc == nil { return nil } - return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storage) + return m.DeleteRepositoryFunc(ctx, virtualStorage, relativePath, storages) } // DeleteReplica runs the mock's DeleteReplicaFunc. diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index cf5dc95e7..57e61dac0 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -410,7 +410,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete non-existing", func(t *testing.T) { rs, _ := newStore(t, nil) - require.Equal(t, ErrNoRowsAffected, rs.DeleteRepository(ctx, vs, repo, stor)) + require.Equal(t, ErrNoRowsAffected, rs.DeleteRepository(ctx, vs, repo, []string{stor})) }) t.Run("delete existing", func(t *testing.T) { @@ -460,9 +460,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) - require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", "deleted")) - require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage")) - require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage")) + require.NoError(t, rs.DeleteRepository(ctx, "deleted", "deleted", []string{"deleted"})) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "other-storages-remain", []string{"deleted-storage"})) + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-2", "deleted-repo", []string{"deleted-storage"})) requireState(t, ctx, virtualStorageState{ @@ -484,6 +484,42 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, ) }) + + t.Run("transactional delete", func(t *testing.T) { + rs, requireState := newStore(t, nil) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-2", 0)) + require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "repository-1", "replica-3", 0)) + + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": repositoryRecord{}, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "replica-1": 0, + "replica-2": 0, + "replica-3": 0, + }, + }, + }, + ) + + require.NoError(t, rs.DeleteRepository(ctx, "virtual-storage-1", "repository-1", []string{"replica-1", "replica-2"})) + requireState(t, ctx, + virtualStorageState{}, + storageState{ + "virtual-storage-1": { + "repository-1": { + "replica-3": 0, + }, + }, + }, + ) + }) }) t.Run("DeleteReplica", func(t *testing.T) { @@ -694,7 +730,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("returns not found for deleted repositories", func(t *testing.T) { - require.NoError(t, rs.DeleteRepository(ctx, vs, repo, "primary")) + require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{"primary"})) requireState(t, ctx, virtualStorageState{}, storageState{ @@ -756,7 +792,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.True(t, exists) - require.NoError(t, rs.DeleteRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.DeleteRepository(ctx, vs, repo, []string{stor})) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.False(t, exists) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 7e9e6799f..5bd48f637 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -163,7 +163,9 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica var deleteFunc func(context.Context, string, string, string) error switch event.Job.Change { case datastore.DeleteRepo: - deleteFunc = dr.rs.DeleteRepository + deleteFunc = func(ctx context.Context, virtualStorage, relativePath, storage string) error { + return dr.rs.DeleteRepository(ctx, virtualStorage, relativePath, []string{storage}) + } case datastore.DeleteReplica: deleteFunc = dr.rs.DeleteReplica default: |