diff options
author | James Fargher <proglottis@gmail.com> | 2021-02-01 04:08:16 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2021-02-01 04:08:16 +0300 |
commit | a60bf85b0d8ac507539803092ceb1be05f003e7b (patch) | |
tree | 0bb88e3e535714b18edb2ec40565df1f09ad166e | |
parent | 0d8e8d6873722723e085d94af57778674db5f46f (diff) |
Populate primary when creating new repository recordscreate_repo_with_primary
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 15 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 14 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 31 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 6 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 |
12 files changed, 66 insertions, 15 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a4692647c..ed94f6fe4 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -361,6 +361,7 @@ func run(cfgs []starter.Config, conf config.Config) error { transactionManager, conf, protoregistry.GitalyProtoPreregistered, + healthChecker, ) repl = praefect.NewReplMgr( diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 37d8b6e61..8642e259b 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -166,7 +166,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, registry, err := protoregistry.New(fd) require.NoError(t, err) - coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry) + coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry, nodeMgr) srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 4cdf2d8c2..d86f7ead5 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -215,6 +215,7 @@ type Coordinator struct { registry *protoregistry.Registry conf config.Config votersMetric *prometheus.HistogramVec + hc nodes.HealthChecker } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -225,6 +226,7 @@ func NewCoordinator( txMgr *transactions.Manager, conf config.Config, r *protoregistry.Registry, + hc nodes.HealthChecker, ) *Coordinator { maxVoters := 1 for _, storage := range conf.VirtualStorages { @@ -248,6 +250,7 @@ func NewCoordinator( }, []string{"virtual_storage"}, ), + hc: hc, } return coordinator @@ -752,7 +755,17 @@ func (c *Coordinator) newRequestFinalizer( ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry") } case datastore.CreateRepo: - if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary); err != nil { + healthyNodes := c.hc.HealthyNodes() + + var virtualStorages, physicalStorages []string + for virtualStorage, nodes := range healthyNodes { + for _, node := range nodes { + virtualStorages = append(virtualStorages, virtualStorage) + physicalStorages = append(physicalStorages, node) + } + } + + if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, virtualStorages, physicalStorages); err != nil { if !errors.Is(err, datastore.RepositoryExistsError{}) { return fmt.Errorf("create repository: %w", err) } diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index e00b70660..af76c1ac5 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -170,6 +170,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index f4057e3c3..f5b549d4a 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -98,6 +98,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { transactions.NewManager(conf), conf, protoregistry.GitalyProtoPreregistered, + nil, ) frame, err := proto.Marshal(&gitalypb.CleanupRequest{Repository: &gitalypb.Repository{ @@ -168,6 +169,7 @@ func TestStreamDirectorMutator(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ @@ -280,6 +282,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" @@ -385,6 +388,7 @@ func TestStreamDirectorAccessor(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) @@ -476,6 +480,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) t.Run("forwards accessor operations", func(t *testing.T) { @@ -638,7 +643,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { var createRepositoryCalled int64 rs := datastore.MockRepositoryStore{ - CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string) error { + CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { atomic.AddInt64(&createRepositoryCalled, 1) assert.Equal(t, targetRepo.StorageName, virtualStorage) assert.Equal(t, targetRepo.RelativePath, relativePath) @@ -654,6 +659,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{ @@ -796,6 +802,7 @@ func TestAbsentCorrelationID(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ @@ -918,6 +925,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { nil, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) ctx, cancel := testhelper.Context() @@ -984,6 +992,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "", Name: "stub"}) @@ -1013,6 +1022,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"}) @@ -1043,6 +1053,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) fullMethod := "/gitaly.NamespaceService/NamespaceExists" @@ -1074,6 +1085,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { nil, config.Config{}, protoregistry.GitalyProtoPreregistered, + mgr, ) fullMethod := "/gitaly.NamespaceService/RemoveNamespace" diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index d3cf1e514..7efe03708 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -99,7 +99,7 @@ type RepositoryStore interface { GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) // CreateRepository creates the repository for the virtual storage and the storage. Returns // RepositoryExistsError when trying to create a repository which already has a matching record. - CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error // DeleteRepository deletes the repository from the virtual storage and the storage. Returns // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage // or the storage. @@ -297,14 +297,33 @@ AND storage = ANY($3) //nolint:stylecheck //nolint:golint -func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { const q = ` -WITH repo AS ( +WITH healthy_storages AS ( + SELECT unnest($4::text[]) AS virtual_storage, unnest($5::text[]) AS storage +), primaries AS ( + SELECT storage + FROM healthy_storages + LEFT JOIN storage_repositories USING (virtual_storage, storage) + WHERE virtual_storage = $1 + AND storage_repositories.relative_path = $2 + AND ( + -- If assignments exist for the repository, only the assigned storages elected as primary. + -- If no assignments exist, any healthy node can be elected as the primary + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = storage_repositories.storage) = 1 + FROM repository_assignments + WHERE repository_assignments.virtual_storage = storage_repositories.virtual_storage + AND repository_assignments.relative_path = storage_repositories.relative_path + ) + ORDER BY generation DESC NULLS LAST, random() + LIMIT 1 +), repo AS ( INSERT INTO repositories ( virtual_storage, relative_path, - generation - ) VALUES ($1, $2, 0) + generation, + "primary" + ) VALUES ($1, $2, 0, (SELECT storage FROM primaries)) ) INSERT INTO storage_repositories ( virtual_storage, @@ -315,7 +334,7 @@ INSERT INTO storage_repositories ( VALUES ($1, $2, $3, 0) ` - _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage) + _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, pq.StringArray(healthyVirtualStorages), pq.StringArray(healthyPhysicalStorages)) var pqerr *pq.Error if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" { diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 5809be6ee..6f805d5cd 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -9,7 +9,7 @@ type MockRepositoryStore struct { IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error - CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error + CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) @@ -52,12 +52,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, //nolint:stylecheck //nolint:golint -func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { if m.CreateRepositoryFunc == nil { return nil } - return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage) + return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, storage, healthyVirtualStorages, healthyPhysicalStorages) } func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index e8d9019d9..c453d6c1d 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -289,7 +289,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("create", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor})) requireState(t, ctx, virtualStorageState{ @@ -310,10 +310,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("conflict", func(t *testing.T) { rs, _ := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor)) + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor})) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, vs, repo, stor), + rs.CreateRepository(ctx, vs, repo, stor, []string{vs}, []string{stor}), ) }) }) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index d5ffebf93..32c7aa2c8 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -229,6 +229,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withTxMgr, conf, opt.withAnnotations, + opt.withNodeMgr, ) // TODO: run a replmgr for EVERY virtual storage diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 39bcb96eb..d2d9ee2f8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -360,6 +360,7 @@ func TestPropagateReplicationJob(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index f8ff2823a..9ed86e349 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -96,6 +96,7 @@ func TestServerFactory(t *testing.T) { txMgr, conf, registry, + nodeMgr, ) checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index cad10cc6b..ae4232069 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -769,6 +769,7 @@ func TestProxyWrites(t *testing.T) { txMgr, conf, protoregistry.GitalyProtoPreregistered, + nodeMgr, ) server := grpc.NewServer( @@ -928,6 +929,7 @@ func TestErrorThreshold(t *testing.T) { nil, conf, registry, + nodeMgr, ) server := grpc.NewServer( |