diff options
-rw-r--r-- | cmd/praefect/main.go | 5 | ||||
-rw-r--r-- | cmd/praefect/subcmd_accept_dataloss_test.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss_test.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_set_replication_factor_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server.go | 21 | ||||
-rw-r--r-- | internal/praefect/server_factory.go | 33 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/service/connections.go | 9 | ||||
-rw-r--r-- | internal/praefect/service/info/consistencycheck_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/service/info/replication_factor.go | 9 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 57 |
15 files changed, 112 insertions, 50 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 0e229b8fb..4daafd172 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -304,6 +304,7 @@ func run(cfgs []starter.Config, conf config.Config) error { healthChecker praefect.HealthChecker nodeSet praefect.NodeSet router praefect.Router + primaryGetter praefect.PrimaryGetter ) if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker) @@ -327,6 +328,7 @@ func run(cfgs []starter.Config, conf config.Config) error { } }() + primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) router = praefect.NewPerRepositoryRouter( @@ -342,6 +344,7 @@ func run(cfgs []starter.Config, conf config.Config) error { healthChecker = praefect.HealthChecker(nodeManager) nodeSet = praefect.NodeSetFromNodeManager(nodeManager) router = praefect.NewNodeManagerRouter(nodeManager, rs) + primaryGetter = nodeManager nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) } @@ -383,6 +386,8 @@ func run(cfgs []starter.Config, conf config.Config) error { rs, assignmentStore, protoregistry.GitalyProtoPreregistered, + nodeSet.Connections(), + primaryGetter, ) ) metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index 7927df9dc..3c3ae6259 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -61,7 +61,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { }, } - ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil))}) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil, nil, nil))}) defer clean() conf.SocketPath = ln.Addr().String() diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 14ae43a57..3c67783d5 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -109,7 +109,7 @@ func TestDatalossSubcommand(t *testing.T) { require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil))}) + registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil, nil, nil))}) defer clean() for _, tc := range []struct { desc string diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 090da41ee..e830450e5 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -93,7 +93,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( - info.NewServer(nil, config.Config{}, nil, nil, store), + info.NewServer(nil, config.Config{}, nil, nil, store, nil, nil), )}) defer clean() diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 37d8b6e61..f094d9ad4 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry) - srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil) + srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil, nil, nil) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 030ba8d89..ebeb2b071 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -176,7 +176,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp NodeSetFromNodeManager(opt.withNodeMgr), ) - prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil) + prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil, nil, nil) listener, port := listenAvailPort(t) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 0b2aef714..82e7bfc57 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -363,7 +363,7 @@ func TestPropagateReplicationJob(t *testing.T) { replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr)) - prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil) + prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil) listener, port := listenAvailPort(t) ctx, cancel := testhelper.Context() diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 516850f59..5eea4bd84 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -101,6 +101,8 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { nil, rs, nil, + nil, + nil, ) defer srv.Stop() diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 0ad23f242..41e11fb33 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -23,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/transaction" @@ -47,7 +48,9 @@ func NewGRPCServer( txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, - rfs info.ReplicationFactorSetter, + assignmentStore AssignmentStore, + conns Connections, + primaryGetter PrimaryGetter, grpcOpts ...grpc.ServerOption, ) *grpc.Server { ctxTagOpts := []grpc_ctxtags.Option{ @@ -103,7 +106,7 @@ func NewGRPCServer( warnDupeAddrs(logger, conf) srv := grpc.NewServer(grpcOpts...) - registerServices(srv, nodeMgr, txMgr, conf, queue, rs, rfs) + registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter) return srv } @@ -115,10 +118,20 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { } // registerServices registers services praefect needs to handle RPCs on its own. -func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs info.ReplicationFactorSetter) { +func registerServices( + srv *grpc.Server, + nm nodes.Manager, + tm *transactions.Manager, + conf config.Config, + queue datastore.ReplicationEventQueue, + rs datastore.RepositoryStore, + assignmentStore AssignmentStore, + conns service.Connections, + primaryGetter info.PrimaryGetter, +) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, rfs)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, assignmentStore, conns, primaryGetter)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go index b8b42cd4f..3bf494556 100644 --- a/internal/praefect/server_factory.go +++ b/internal/praefect/server_factory.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -27,19 +26,23 @@ func NewServerFactory( txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, - rfs info.ReplicationFactorSetter, + assignmentStore AssignmentStore, registry *protoregistry.Registry, + conns Connections, + primaryGetter PrimaryGetter, ) *ServerFactory { return &ServerFactory{ - conf: conf, - logger: logger, - director: director, - nodeMgr: nodeMgr, - txMgr: txMgr, - queue: queue, - rs: rs, - rfs: rfs, - registry: registry, + conf: conf, + logger: logger, + director: director, + nodeMgr: nodeMgr, + txMgr: txMgr, + queue: queue, + rs: rs, + assignmentStore: assignmentStore, + registry: registry, + conns: conns, + primaryGetter: primaryGetter, } } @@ -53,9 +56,11 @@ type ServerFactory struct { txMgr *transactions.Manager queue datastore.ReplicationEventQueue rs datastore.RepositoryStore - rfs info.ReplicationFactorSetter + assignmentStore AssignmentStore registry *protoregistry.Registry secure, insecure []*grpc.Server + conns Connections + primaryGetter PrimaryGetter } // Serve starts serving on the provided listener with newly created grpc.Server @@ -124,7 +129,9 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server { s.txMgr, s.queue, s.rs, - s.rfs, + s.assignmentStore, + s.conns, + s.primaryGetter, grpcOpts..., ) } diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 312d542c8..2ab625909 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -133,7 +133,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -162,7 +162,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -204,7 +204,7 @@ func TestServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) defer praefectServerFactory.Stop() // start with tcp address @@ -272,7 +272,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -281,7 +281,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") diff --git a/internal/praefect/service/connections.go b/internal/praefect/service/connections.go new file mode 100644 index 000000000..38d72b73f --- /dev/null +++ b/internal/praefect/service/connections.go @@ -0,0 +1,9 @@ +package service + +import "google.golang.org/grpc" + +// Connections contains connections to Gitaly nodes keyed by virtual storage and storage +// +// This duplicates the praefect.Connections type as it is not possible to import anything from `praefect` +// to `info` or `server` packages due to cyclic dependencies. +type Connections map[string]map[string]*grpc.ClientConn diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go index 2a77e7d0f..8d2c34dee 100644 --- a/internal/praefect/service/info/consistencycheck_test.go +++ b/internal/praefect/service/info/consistencycheck_test.go @@ -35,7 +35,7 @@ func TestServer_ConsistencyCheck_repositorySpecificPrimariesUnsupported(t *testi NewServer(nil, config.Config{ Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, - }, nil, nil, nil, + }, nil, nil, nil, nil, nil, ).ConsistencyCheck(nil, nil), ) } @@ -148,7 +148,7 @@ func TestServer_ConsistencyCheck(t *testing.T) { praefectSrv := grpc.NewServer() defer praefectSrv.Stop() - gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil)) + gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil, nil, nil)) go praefectSrv.Serve(praefectListener) praefectConn, err := client.Dial("unix://"+praefectAddr, nil) diff --git a/internal/praefect/service/info/replication_factor.go b/internal/praefect/service/info/replication_factor.go index 622c76a83..b3fcf4e8f 100644 --- a/internal/praefect/service/info/replication_factor.go +++ b/internal/praefect/service/info/replication_factor.go @@ -10,13 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -// ReplicationFactorSetter sets a repository's replication factor -type ReplicationFactorSetter interface { - // SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. - // Please see the protobuf documentation of the method for details. - SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) -} - func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) { resp, err := s.setReplicationFactor(ctx, req) if err != nil { @@ -32,7 +25,7 @@ func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetRepl } func (s *Server) setReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) { - storages, err := s.rfs.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor)) + storages, err := s.assignmentStore.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor)) if err != nil { return nil, fmt.Errorf("set replication factor: %w", err) } diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 9cb67efa3..e54ea267a 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -7,27 +7,60 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +// AssignmentStore is an interface for getting repository host node assignments. +// +// This duplicates the praefect.AssignmentGetter type as it is not possible to import anything from +// `praefect` to `info` packages due to cyclic dependencies. +type AssignmentStore interface { + // GetHostAssignments returns the names of the storages assigned to host the repository. + // The primary node must always be assigned. + GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) + // SetReplicationFactor sets a repository's replication factor and returns the current assignments. + SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) +} + +// PrimaryGetter is an interface for getting a primary of a repository. +// +// This duplicates the praefect.PrimaryGetter type as it is not possible to import anything from +// `praefect` to `info` packages due to cyclic dependencies. +type PrimaryGetter interface { + // GetPrimary returns the primary storage for a given repository. + GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error) +} + // Server is a InfoService server type Server struct { - nodeMgr nodes.Manager - conf config.Config - queue datastore.ReplicationEventQueue - rs datastore.RepositoryStore - - rfs ReplicationFactorSetter + nodeMgr nodes.Manager + conf config.Config + queue datastore.ReplicationEventQueue + rs datastore.RepositoryStore + assignmentStore AssignmentStore + conns service.Connections + primaryGetter PrimaryGetter } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs ReplicationFactorSetter) gitalypb.PraefectInfoServiceServer { +func NewServer( + nodeMgr nodes.Manager, + conf config.Config, + queue datastore.ReplicationEventQueue, + rs datastore.RepositoryStore, + assignmentStore AssignmentStore, + conns service.Connections, + primaryGetter PrimaryGetter, +) gitalypb.PraefectInfoServiceServer { return &Server{ - nodeMgr: nodeMgr, - conf: conf, - queue: queue, - rs: rs, - rfs: rfs, + nodeMgr: nodeMgr, + conf: conf, + queue: queue, + rs: rs, + assignmentStore: assignmentStore, + conns: conns, + primaryGetter: primaryGetter, } } |