Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2021-02-01 04:08:16 +0300
committerJames Fargher <proglottis@gmail.com>2021-02-01 04:08:16 +0300
commita60bf85b0d8ac507539803092ceb1be05f003e7b (patch)
tree0bb88e3e535714b18edb2ec40565df1f09ad166e
parent0d8e8d6873722723e085d94af57778674db5f46f (diff)
Populate primary when creating new repository recordscreate_repo_with_primary
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator.go15
-rw-r--r--internal/praefect/coordinator_pg_test.go1
-rw-r--r--internal/praefect/coordinator_test.go14
-rw-r--r--internal/praefect/datastore/repository_store.go31
-rw-r--r--internal/praefect/datastore/repository_store_mock.go6
-rw-r--r--internal/praefect/datastore/repository_store_test.go6
-rw-r--r--internal/praefect/helper_test.go1
-rw-r--r--internal/praefect/replicator_test.go1
-rw-r--r--internal/praefect/server_factory_test.go1
-rw-r--r--internal/praefect/server_test.go2
12 files changed, 66 insertions, 15 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index a4692647c..ed94f6fe4 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -361,6 +361,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
transactionManager,
conf,
protoregistry.GitalyProtoPreregistered,
+ healthChecker,
)
repl = praefect.NewReplMgr(
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 37d8b6e61..8642e259b 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -166,7 +166,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
registry, err := protoregistry.New(fd)
require.NoError(t, err)
- coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry)
+ coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry, nodeMgr)
srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 4cdf2d8c2..d86f7ead5 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -215,6 +215,7 @@ type Coordinator struct {
registry *protoregistry.Registry
conf config.Config
votersMetric *prometheus.HistogramVec
+ hc nodes.HealthChecker
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
@@ -225,6 +226,7 @@ func NewCoordinator(
txMgr *transactions.Manager,
conf config.Config,
r *protoregistry.Registry,
+ hc nodes.HealthChecker,
) *Coordinator {
maxVoters := 1
for _, storage := range conf.VirtualStorages {
@@ -248,6 +250,7 @@ func NewCoordinator(
},
[]string{"virtual_storage"},
),
+ hc: hc,
}
return coordinator
@@ -752,7 +755,17 @@ func (c *Coordinator) newRequestFinalizer(
ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry")
}
case datastore.CreateRepo:
- if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil {
+ healthyNodes := c.hc.HealthyNodes()
+
+ var virtualStorages, physicalStorages []string
+ for virtualStorage, nodes := range healthyNodes {
+ for _, node := range nodes {
+ virtualStorages = append(virtualStorages, virtualStorage)
+ physicalStorages = append(physicalStorages, node)
+ }
+ }
+
+ if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, virtualStorages, physicalStorages); err != nil {
if !errors.Is(err, datastore.RepositoryExistsError{}) {
return fmt.Errorf("create repository: %w", err)
}
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index e00b70660..af76c1ac5 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -170,6 +170,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index f4057e3c3..f5b549d4a 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -98,6 +98,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
transactions.NewManager(conf),
conf,
protoregistry.GitalyProtoPreregistered,
+ nil,
)
frame, err := proto.Marshal(&gitalypb.CleanupRequest{Repository: &gitalypb.Repository{
@@ -168,6 +169,7 @@ func TestStreamDirectorMutator(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
@@ -280,6 +282,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
@@ -385,6 +388,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
@@ -476,6 +480,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
t.Run("forwards accessor operations", func(t *testing.T) {
@@ -638,7 +643,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
var createRepositoryCalled int64
rs := datastore.MockRepositoryStore{
- CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string) error {
+ CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error {
atomic.AddInt64(&createRepositoryCalled, 1)
assert.Equal(t, targetRepo.StorageName, virtualStorage)
assert.Equal(t, targetRepo.RelativePath, relativePath)
@@ -654,6 +659,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
@@ -796,6 +802,7 @@ func TestAbsentCorrelationID(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
@@ -918,6 +925,7 @@ func TestStreamDirectorStorageScope(t *testing.T) {
nil,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
ctx, cancel := testhelper.Context()
@@ -984,6 +992,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
+ mgr,
)
frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "", Name: "stub"})
@@ -1013,6 +1022,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
+ mgr,
)
frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"})
@@ -1043,6 +1053,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
+ mgr,
)
fullMethod := "/gitaly.NamespaceService/NamespaceExists"
@@ -1074,6 +1085,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
+ mgr,
)
fullMethod := "/gitaly.NamespaceService/RemoveNamespace"
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index d3cf1e514..7efe03708 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -99,7 +99,7 @@ type RepositoryStore interface {
GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
// CreateRepository creates the repository for the virtual storage and the storage. Returns
// RepositoryExistsError when trying to create a repository which already has a matching record.
- CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
+ CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error
// DeleteRepository deletes the repository from the virtual storage and the storage. Returns
// RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage
// or the storage.
@@ -297,14 +297,33 @@ AND storage = ANY($3)
//nolint:stylecheck
//nolint:golint
-func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error {
const q = `
-WITH repo AS (
+WITH healthy_storages AS (
+ SELECT unnest($4::text[]) AS virtual_storage, unnest($5::text[]) AS storage
+), primaries AS (
+ SELECT storage
+ FROM healthy_storages
+ LEFT JOIN storage_repositories USING (virtual_storage, storage)
+ WHERE virtual_storage = $1
+ AND storage_repositories.relative_path = $2
+ AND (
+ -- If assignments exist for the repository, only the assigned storages elected as primary.
+ -- If no assignments exist, any healthy node can be elected as the primary
+ SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = storage_repositories.storage) = 1
+ FROM repository_assignments
+ WHERE repository_assignments.virtual_storage = storage_repositories.virtual_storage
+ AND repository_assignments.relative_path = storage_repositories.relative_path
+ )
+ ORDER BY generation DESC NULLS LAST, random()
+ LIMIT 1
+), repo AS (
INSERT INTO repositories (
virtual_storage,
relative_path,
- generation
- ) VALUES ($1, $2, 0)
+ generation,
+ "primary"
+ ) VALUES ($1, $2, 0, (SELECT storage FROM primaries))
)
INSERT INTO storage_repositories (
virtual_storage,
@@ -315,7 +334,7 @@ INSERT INTO storage_repositories (
VALUES ($1, $2, $3, 0)
`
- _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage)
+ _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, pq.StringArray(healthyVirtualStorages), pq.StringArray(healthyPhysicalStorages))
var pqerr *pq.Error
if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" {
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 5809be6ee..6f805d5cd 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -9,7 +9,7 @@ type MockRepositoryStore struct {
IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
- CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
+ CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error
DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
@@ -52,12 +52,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage,
//nolint:stylecheck
//nolint:golint
-func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
+func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error {
if m.CreateRepositoryFunc == nil {
return nil
}
- return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage)
+ return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage, healthyVirtualStorages, healthyPhysicalStorages)
}
func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index e8d9019d9..c453d6c1d 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -289,7 +289,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("create", func(t *testing.T) {
rs, requireState := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor))
+ require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor}))
requireState(t, ctx,
virtualStorageState{
@@ -310,10 +310,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) {
t.Run("conflict", func(t *testing.T) {
rs, _ := newStore(t, nil)
- require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor))
+ require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor}))
require.Equal(t,
RepositoryExistsError{vs, repo, stor},
- rs.CreateRepository(ctx, vs, repo, stor),
+ rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor}),
)
})
})
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index d5ffebf93..32c7aa2c8 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -229,6 +229,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withTxMgr,
conf,
opt.withAnnotations,
+ opt.withNodeMgr,
)
// TODO: run a replmgr for EVERY virtual storage
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 39bcb96eb..d2d9ee2f8 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -360,6 +360,7 @@ func TestPropagateReplicationJob(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index f8ff2823a..9ed86e349 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -96,6 +96,7 @@ func TestServerFactory(t *testing.T) {
txMgr,
conf,
registry,
+ nodeMgr,
)
checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index cad10cc6b..ae4232069 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -769,6 +769,7 @@ func TestProxyWrites(t *testing.T) {
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
+ nodeMgr,
)
server := grpc.NewServer(
@@ -928,6 +929,7 @@ func TestErrorThreshold(t *testing.T) {
nil,
conf,
registry,
+ nodeMgr,
)
server := grpc.NewServer(