diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-28 18:45:13 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-29 13:58:25 +0300 |
commit | 3de81e8508aae4fd71d9f326977cb42ffed8a6cb (patch) | |
tree | 1efdf29882884ebea7aaa8fd6d3aa1618e3c0130 | |
parent | 4a4ea7c0489b0eb0f7a65c65e5b15c35e3d16f14 (diff) |
Use a separate query for SetAuthoritativeStorage from IncrementGeneration
SetAuthoritativeStorage RPCs purpose is to set one of the replicas to the
latest generation as a way to accept data loss. Currently it uses
IncrementGeneration to do so. The behavior should be different though.
SetAuthoritativeStorage needs to modify the generation even if the targeted
replica is not on the latest generation as its purpose is to set it to the
latest one. IncrementGeneration on the other hand is used to acknowledge
mutator RPCs. It should not allow incrementing generation of replicas which
are not on the latest generation as that would effectively mean jumping over
some writes and losing them. This can currently happen if the primary changes
while there is an inflight write to the old primary. In preparation for fixing
this, this commit gives the RPC its own query so it can behave correctly even
after the behavior of IncrementGeneration is altered.
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 32 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 10 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 48 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 13 |
4 files changed, 96 insertions, 7 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 278f4e3ea..c98377367 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -103,6 +103,8 @@ type RepositoryStore interface { // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. + SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns // ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage // or the storages. @@ -240,6 +242,36 @@ ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET return err } +// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. +func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error { + result, err := rs.db.ExecContext(ctx, ` +WITH updated_repository AS ( + UPDATE repositories + SET generation = generation + 1 + WHERE virtual_storage = $1 + AND relative_path = $2 + RETURNING virtual_storage, relative_path, generation +) + +INSERT INTO storage_repositories +SELECT virtual_storage, relative_path, $3, generation +FROM updated_repository +ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE + SET generation = EXCLUDED.generation + `, virtualStorage, relativePath, storage) + if err != nil { + return fmt.Errorf("exec: %w", err) + } + + if rowsAffected, err := result.RowsAffected(); err != nil { + return fmt.Errorf("rows affected: %w", err) + } else if rowsAffected == 0 { + return commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + } + + return nil +} + func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) { const q = ` SELECT storage, generation diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 876dcec73..684085e5d 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -10,6 +10,7 @@ type MockRepositoryStore struct { GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error @@ -61,6 +62,15 @@ func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorag return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) } +// SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error. +func (m MockRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error { + if m.SetAuthoritativeReplicaFunc == nil { + return nil + } + + return m.SetAuthoritativeReplicaFunc(ctx, virtualStorage, relativePath, storage) +} + func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error { if m.DeleteRepositoryFunc == nil { return nil diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index ad5058aca..a3286e0dd 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -241,6 +241,54 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) }) + t.Run("SetAuthoritativeReplica", func(t *testing.T) { + rs, requireState := newStore(t, nil) + + t.Run("fails when repository doesnt exist", func(t *testing.T) { + require.Equal(t, + commonerr.NewRepositoryNotFoundError(vs, repo), + rs.SetAuthoritativeReplica(ctx, vs, repo, stor), + ) + }) + + t.Run("sets the given replica as the latest", func(t *testing.T) { + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "storage-1", 0)) + require.NoError(t, rs.SetGeneration(ctx, vs, repo, "storage-2", 0)) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": repositoryRecord{}, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 0, + "storage-2": 0, + }, + }, + }, + ) + + require.NoError(t, rs.SetAuthoritativeReplica(ctx, vs, repo, "storage-1")) + requireState(t, ctx, + virtualStorageState{ + "virtual-storage-1": { + "repository-1": repositoryRecord{}, + }, + }, + storageState{ + "virtual-storage-1": { + "repository-1": { + "storage-1": 1, + "storage-2": 0, + }, + }, + }, + ) + }) + }) + t.Run("GetGeneration", func(t *testing.T) { rs, _ := newStore(t, nil) diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index fec5396a8..3e32d9135 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -2,8 +2,10 @@ package info import ( "context" + "errors" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service" @@ -79,14 +81,11 @@ func (s *Server) SetAuthoritativeStorage(ctx context.Context, req *gitalypb.SetA return nil, helper.ErrInvalidArgumentf("unknown authoritative storage: %q", req.AuthoritativeStorage) } - exists, err := s.rs.RepositoryExists(ctx, req.VirtualStorage, req.RelativePath) - if err != nil { - return nil, err - } else if !exists { - return nil, helper.ErrInvalidArgumentf("repository %q does not exist on virtual storage %q", req.RelativePath, req.VirtualStorage) - } + if err := s.rs.SetAuthoritativeReplica(ctx, req.VirtualStorage, req.RelativePath, req.AuthoritativeStorage); err != nil { + if errors.As(err, &commonerr.RepositoryNotFoundError{}) { + return nil, helper.ErrInvalidArgumentf("repository %q does not exist on virtual storage %q", req.RelativePath, req.VirtualStorage) + } - if err := s.rs.IncrementGeneration(ctx, req.VirtualStorage, req.RelativePath, req.AuthoritativeStorage, nil); err != nil { return nil, helper.ErrInternal(err) } |