Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-03-26 11:32:21 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-04-01 12:50:52 +0300
commit9c08dc0188e1a00866c9c1361bfd67862bec01d7 (patch)
tree409b059fcffbef061ecf7d3414339b39ca8d766a
parent54f0de751b48b23424640cd851a8222b5025f6bc (diff)
Inject PrimaryGetter and Connection in Praefect's Info server
This commit wires up a PrimaryGetter and Connections into Praefect's Info server as they'll be needed when converting the methods to support variable replication factor and repository specific primaries. ReplicationFactorSetter is also converted into AssignmentStore so the methods can acccess assignments later.
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_set_replication_factor_test.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/praefect/repository_exists_test.go2
-rw-r--r--internal/praefect/server.go21
-rw-r--r--internal/praefect/server_factory.go33
-rw-r--r--internal/praefect/server_factory_test.go10
-rw-r--r--internal/praefect/service/connections.go9
-rw-r--r--internal/praefect/service/info/consistencycheck_test.go4
-rw-r--r--internal/praefect/service/info/replication_factor.go9
-rw-r--r--internal/praefect/service/info/server.go57
15 files changed, 112 insertions, 50 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 0e229b8fb..4daafd172 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -304,6 +304,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
healthChecker praefect.HealthChecker
nodeSet praefect.NodeSet
router praefect.Router
+ primaryGetter praefect.PrimaryGetter
)
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker)
@@ -327,6 +328,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}()
+ primaryGetter = elector
assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
router = praefect.NewPerRepositoryRouter(
@@ -342,6 +344,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
healthChecker = praefect.HealthChecker(nodeManager)
nodeSet = praefect.NodeSetFromNodeManager(nodeManager)
router = praefect.NewNodeManagerRouter(nodeManager, rs)
+ primaryGetter = nodeManager
nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration())
}
@@ -383,6 +386,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
rs,
assignmentStore,
protoregistry.GitalyProtoPreregistered,
+ nodeSet.Connections(),
+ primaryGetter,
)
)
metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index 7927df9dc..3c3ae6259 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -61,7 +61,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
},
}
- ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil))})
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil, nil, nil))})
defer clean()
conf.SocketPath = ln.Addr().String()
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 14ae43a57..3c67783d5 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -109,7 +109,7 @@ func TestDatalossSubcommand(t *testing.T) {
require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
ln, clean := listenAndServe(t, []svcRegistrar{
- registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil))})
+ registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil, nil, nil))})
defer clean()
for _, tc := range []struct {
desc string
diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go
index 090da41ee..e830450e5 100644
--- a/cmd/praefect/subcmd_set_replication_factor_test.go
+++ b/cmd/praefect/subcmd_set_replication_factor_test.go
@@ -93,7 +93,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) {
)
ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(
- info.NewServer(nil, config.Config{}, nil, nil, store),
+ info.NewServer(nil, config.Config{}, nil, nil, store, nil, nil),
)})
defer clean()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 37d8b6e61..f094d9ad4 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry)
- srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil)
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 030ba8d89..ebeb2b071 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -176,7 +176,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
NodeSetFromNodeManager(opt.withNodeMgr),
)
- prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil)
+ prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil, nil, nil)
listener, port := listenAvailPort(t)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 0b2aef714..82e7bfc57 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -363,7 +363,7 @@ func TestPropagateReplicationJob(t *testing.T) {
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil)
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil)
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 516850f59..5eea4bd84 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -101,6 +101,8 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
nil,
rs,
nil,
+ nil,
+ nil,
)
defer srv.Stop()
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 0ad23f242..41e11fb33 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -23,6 +23,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/transaction"
@@ -47,7 +48,9 @@ func NewGRPCServer(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
- rfs info.ReplicationFactorSetter,
+ assignmentStore AssignmentStore,
+ conns Connections,
+ primaryGetter PrimaryGetter,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
ctxTagOpts := []grpc_ctxtags.Option{
@@ -103,7 +106,7 @@ func NewGRPCServer(
warnDupeAddrs(logger, conf)
srv := grpc.NewServer(grpcOpts...)
- registerServices(srv, nodeMgr, txMgr, conf, queue, rs, rfs)
+ registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter)
return srv
}
@@ -115,10 +118,20 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
// registerServices registers services praefect needs to handle RPCs on its own.
-func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs info.ReplicationFactorSetter) {
+func registerServices(
+ srv *grpc.Server,
+ nm nodes.Manager,
+ tm *transactions.Manager,
+ conf config.Config,
+ queue datastore.ReplicationEventQueue,
+ rs datastore.RepositoryStore,
+ assignmentStore AssignmentStore,
+ conns service.Connections,
+ primaryGetter info.PrimaryGetter,
+) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, rfs))
+ gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, assignmentStore, conns, primaryGetter))
gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv, health.NewServer())
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index b8b42cd4f..3bf494556 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -27,19 +26,23 @@ func NewServerFactory(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
- rfs info.ReplicationFactorSetter,
+ assignmentStore AssignmentStore,
registry *protoregistry.Registry,
+ conns Connections,
+ primaryGetter PrimaryGetter,
) *ServerFactory {
return &ServerFactory{
- conf: conf,
- logger: logger,
- director: director,
- nodeMgr: nodeMgr,
- txMgr: txMgr,
- queue: queue,
- rs: rs,
- rfs: rfs,
- registry: registry,
+ conf: conf,
+ logger: logger,
+ director: director,
+ nodeMgr: nodeMgr,
+ txMgr: txMgr,
+ queue: queue,
+ rs: rs,
+ assignmentStore: assignmentStore,
+ registry: registry,
+ conns: conns,
+ primaryGetter: primaryGetter,
}
}
@@ -53,9 +56,11 @@ type ServerFactory struct {
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
rs datastore.RepositoryStore
- rfs info.ReplicationFactorSetter
+ assignmentStore AssignmentStore
registry *protoregistry.Registry
secure, insecure []*grpc.Server
+ conns Connections
+ primaryGetter PrimaryGetter
}
// Serve starts serving on the provided listener with newly created grpc.Server
@@ -124,7 +129,9 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
s.txMgr,
s.queue,
s.rs,
- s.rfs,
+ s.assignmentStore,
+ s.conns,
+ s.primaryGetter,
grpcOpts...,
)
}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 312d542c8..2ab625909 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -133,7 +133,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -162,7 +162,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -204,7 +204,7 @@ func TestServerFactory(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
// start with tcp address
@@ -272,7 +272,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -281,7 +281,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
diff --git a/internal/praefect/service/connections.go b/internal/praefect/service/connections.go
new file mode 100644
index 000000000..38d72b73f
--- /dev/null
+++ b/internal/praefect/service/connections.go
@@ -0,0 +1,9 @@
+package service
+
+import "google.golang.org/grpc"
+
+// Connections contains connections to Gitaly nodes keyed by virtual storage and storage
+//
+// This duplicates the praefect.Connections type as it is not possible to import anything from `praefect`
+// to `info` or `server` packages due to cyclic dependencies.
+type Connections map[string]map[string]*grpc.ClientConn
diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go
index 2a77e7d0f..8d2c34dee 100644
--- a/internal/praefect/service/info/consistencycheck_test.go
+++ b/internal/praefect/service/info/consistencycheck_test.go
@@ -35,7 +35,7 @@ func TestServer_ConsistencyCheck_repositorySpecificPrimariesUnsupported(t *testi
NewServer(nil,
config.Config{
Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
- }, nil, nil, nil,
+ }, nil, nil, nil, nil, nil,
).ConsistencyCheck(nil, nil),
)
}
@@ -148,7 +148,7 @@ func TestServer_ConsistencyCheck(t *testing.T) {
praefectSrv := grpc.NewServer()
defer praefectSrv.Stop()
- gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil))
+ gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil, nil, nil))
go praefectSrv.Serve(praefectListener)
praefectConn, err := client.Dial("unix://"+praefectAddr, nil)
diff --git a/internal/praefect/service/info/replication_factor.go b/internal/praefect/service/info/replication_factor.go
index 622c76a83..b3fcf4e8f 100644
--- a/internal/praefect/service/info/replication_factor.go
+++ b/internal/praefect/service/info/replication_factor.go
@@ -10,13 +10,6 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-// ReplicationFactorSetter sets a repository's replication factor
-type ReplicationFactorSetter interface {
- // SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met.
- // Please see the protobuf documentation of the method for details.
- SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
-}
-
func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) {
resp, err := s.setReplicationFactor(ctx, req)
if err != nil {
@@ -32,7 +25,7 @@ func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetRepl
}
func (s *Server) setReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) {
- storages, err := s.rfs.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor))
+ storages, err := s.assignmentStore.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor))
if err != nil {
return nil, fmt.Errorf("set replication factor: %w", err)
}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 9cb67efa3..e54ea267a 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -7,27 +7,60 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+// AssignmentStore is an interface for getting repository host node assignments.
+//
+// This duplicates the praefect.AssignmentGetter type as it is not possible to import anything from
+// `praefect` to `info` packages due to cyclic dependencies.
+type AssignmentStore interface {
+ // GetHostAssignments returns the names of the storages assigned to host the repository.
+ // The primary node must always be assigned.
+ GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)
+ // SetReplicationFactor sets a repository's replication factor and returns the current assignments.
+ SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
+}
+
+// PrimaryGetter is an interface for getting a primary of a repository.
+//
+// This duplicates the praefect.PrimaryGetter type as it is not possible to import anything from
+// `praefect` to `info` packages due to cyclic dependencies.
+type PrimaryGetter interface {
+ // GetPrimary returns the primary storage for a given repository.
+ GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error)
+}
+
// Server is a InfoService server
type Server struct {
- nodeMgr nodes.Manager
- conf config.Config
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
-
- rfs ReplicationFactorSetter
+ nodeMgr nodes.Manager
+ conf config.Config
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ assignmentStore AssignmentStore
+ conns service.Connections
+ primaryGetter PrimaryGetter
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs ReplicationFactorSetter) gitalypb.PraefectInfoServiceServer {
+func NewServer(
+ nodeMgr nodes.Manager,
+ conf config.Config,
+ queue datastore.ReplicationEventQueue,
+ rs datastore.RepositoryStore,
+ assignmentStore AssignmentStore,
+ conns service.Connections,
+ primaryGetter PrimaryGetter,
+) gitalypb.PraefectInfoServiceServer {
return &Server{
- nodeMgr: nodeMgr,
- conf: conf,
- queue: queue,
- rs: rs,
- rfs: rfs,
+ nodeMgr: nodeMgr,
+ conf: conf,
+ queue: queue,
+ rs: rs,
+ assignmentStore: assignmentStore,
+ conns: conns,
+ primaryGetter: primaryGetter,
}
}