Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-07-28 18:45:13 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-07-29 13:58:25 +0300
commit3de81e8508aae4fd71d9f326977cb42ffed8a6cb (patch)
tree1efdf29882884ebea7aaa8fd6d3aa1618e3c0130
parent4a4ea7c0489b0eb0f7a65c65e5b15c35e3d16f14 (diff)
Use a separate query for SetAuthoritativeStorage from IncrementGeneration
SetAuthoritativeStorage RPCs purpose is to set one of the replicas to the latest generation as a way to accept data loss. Currently it uses IncrementGeneration to do so. The behavior should be different though. SetAuthoritativeStorage needs to modify the generation even if the targeted replica is not on the latest generation as its purpose is to set it to the latest one. IncrementGeneration on the other hand is used to acknowledge mutator RPCs. It should not allow incrementing generation of replicas which are not on the latest generation as that would effectively mean jumping over some writes and losing them. This can currently happen if the primary changes while there is an inflight write to the old primary. In preparation for fixing this, this commit gives the RPC its own query so it can behave correctly even after the behavior of IncrementGeneration is altered.
-rw-r--r--internal/praefect/datastore/repository_store.go32
-rw-r--r--internal/praefect/datastore/repository_store_mock.go10
-rw-r--r--internal/praefect/datastore/repository_store_test.go48
-rw-r--r--internal/praefect/service/info/server.go13
4 files changed, 96 insertions, 7 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 278f4e3ea..c98377367 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -103,6 +103,8 @@ type RepositoryStore interface {
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
+ // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
+ SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
// DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns
// ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage
// or the storages.
@@ -240,6 +242,36 @@ ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET
return err
}
+// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
+func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ result, err := rs.db.ExecContext(ctx, `
+WITH updated_repository AS (
+ UPDATE repositories
+ SET generation = generation + 1
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ RETURNING virtual_storage, relative_path, generation
+)
+
+INSERT INTO storage_repositories
+SELECT virtual_storage, relative_path, $3, generation
+FROM updated_repository
+ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE
+ SET generation = EXCLUDED.generation
+ `, virtualStorage, relativePath, storage)
+ if err != nil {
+ return fmt.Errorf("exec: %w", err)
+ }
+
+ if rowsAffected, err := result.RowsAffected(); err != nil {
+ return fmt.Errorf("rows affected: %w", err)
+ } else if rowsAffected == 0 {
+ return commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
+ }
+
+ return nil
+}
+
func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) {
const q = `
SELECT storage, generation
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 876dcec73..684085e5d 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -10,6 +10,7 @@ type MockRepositoryStore struct {
GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
+ SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error
DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
@@ -61,6 +62,15 @@ func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorag
return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments)
}
+// SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error.
+func (m MockRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ if m.SetAuthoritativeReplicaFunc == nil {
+ return nil
+ }
+
+ return m.SetAuthoritativeReplicaFunc(ctx, virtualStorage, relativePath, storage)
+}
+
func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error {
if m.DeleteRepositoryFunc == nil {
return nil
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index ad5058aca..a3286e0dd 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -241,6 +241,54 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
})
})
+ t.Run("SetAuthoritativeReplica", func(t *testing.T) {
+ rs, requireState := newStore(t, nil)
+
+ t.Run("fails when repository doesnt exist", func(t *testing.T) {
+ require.Equal(t,
+ commonerr.NewRepositoryNotFoundError(vs, repo),
+ rs.SetAuthoritativeReplica(ctx, vs, repo, stor),
+ )
+ })
+
+ t.Run("sets the given replica as the latest", func(t *testing.T) {
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "storage-1", 0))
+ require.NoError(t, rs.SetGeneration(ctx, vs, repo, "storage-2", 0))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": repositoryRecord{},
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 0,
+ "storage-2": 0,
+ },
+ },
+ },
+ )
+
+ require.NoError(t, rs.SetAuthoritativeReplica(ctx, vs, repo, "storage-1"))
+ requireState(t, ctx,
+ virtualStorageState{
+ "virtual-storage-1": {
+ "repository-1": repositoryRecord{},
+ },
+ },
+ storageState{
+ "virtual-storage-1": {
+ "repository-1": {
+ "storage-1": 1,
+ "storage-2": 0,
+ },
+ },
+ },
+ )
+ })
+ })
+
t.Run("GetGeneration", func(t *testing.T) {
rs, _ := newStore(t, nil)
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index fec5396a8..3e32d9135 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -2,8 +2,10 @@ package info
import (
"context"
+ "errors"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service"
@@ -79,14 +81,11 @@ func (s *Server) SetAuthoritativeStorage(ctx context.Context, req *gitalypb.SetA
return nil, helper.ErrInvalidArgumentf("unknown authoritative storage: %q", req.AuthoritativeStorage)
}
- exists, err := s.rs.RepositoryExists(ctx, req.VirtualStorage, req.RelativePath)
- if err != nil {
- return nil, err
- } else if !exists {
- return nil, helper.ErrInvalidArgumentf("repository %q does not exist on virtual storage %q", req.RelativePath, req.VirtualStorage)
- }
+ if err := s.rs.SetAuthoritativeReplica(ctx, req.VirtualStorage, req.RelativePath, req.AuthoritativeStorage); err != nil {
+ if errors.As(err, &commonerr.RepositoryNotFoundError{}) {
+ return nil, helper.ErrInvalidArgumentf("repository %q does not exist on virtual storage %q", req.RelativePath, req.VirtualStorage)
+ }
- if err := s.rs.IncrementGeneration(ctx, req.VirtualStorage, req.RelativePath, req.AuthoritativeStorage, nil); err != nil {
return nil, helper.ErrInternal(err)
}