Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--cmd/praefect/subcmd_accept_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go2
-rw-r--r--cmd/praefect/subcmd_set_replication_factor_test.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator_test.go2
-rw-r--r--internal/praefect/repository_exists_test.go2
-rw-r--r--internal/praefect/server.go21
-rw-r--r--internal/praefect/server_factory.go33
-rw-r--r--internal/praefect/server_factory_test.go10
-rw-r--r--internal/praefect/service/connections.go9
-rw-r--r--internal/praefect/service/info/consistencycheck_test.go4
-rw-r--r--internal/praefect/service/info/replication_factor.go9
-rw-r--r--internal/praefect/service/info/server.go57
15 files changed, 112 insertions, 50 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 0e229b8fb..4daafd172 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -304,6 +304,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
healthChecker praefect.HealthChecker
nodeSet praefect.NodeSet
router praefect.Router
+ primaryGetter praefect.PrimaryGetter
)
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker)
@@ -327,6 +328,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
}()
+ primaryGetter = elector
assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
router = praefect.NewPerRepositoryRouter(
@@ -342,6 +344,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
healthChecker = praefect.HealthChecker(nodeManager)
nodeSet = praefect.NodeSetFromNodeManager(nodeManager)
router = praefect.NewNodeManagerRouter(nodeManager, rs)
+ primaryGetter = nodeManager
nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration())
}
@@ -383,6 +386,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
rs,
assignmentStore,
protoregistry.GitalyProtoPreregistered,
+ nodeSet.Connections(),
+ primaryGetter,
)
)
metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl)
diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go
index 7927df9dc..3c3ae6259 100644
--- a/cmd/praefect/subcmd_accept_dataloss_test.go
+++ b/cmd/praefect/subcmd_accept_dataloss_test.go
@@ -61,7 +61,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) {
},
}
- ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil))})
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(nil, conf, q, rs, nil, nil, nil))})
defer clean()
conf.SocketPath = ln.Addr().String()
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 14ae43a57..3c67783d5 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -109,7 +109,7 @@ func TestDatalossSubcommand(t *testing.T) {
require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "repository-2", "gitaly-3", 0))
ln, clean := listenAndServe(t, []svcRegistrar{
- registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil))})
+ registerPraefectInfoServer(info.NewServer(nil, cfg, nil, gs, nil, nil, nil))})
defer clean()
for _, tc := range []struct {
desc string
diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go
index 090da41ee..e830450e5 100644
--- a/cmd/praefect/subcmd_set_replication_factor_test.go
+++ b/cmd/praefect/subcmd_set_replication_factor_test.go
@@ -93,7 +93,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) {
)
ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(
- info.NewServer(nil, config.Config{}, nil, nil, store),
+ info.NewServer(nil, config.Config{}, nil, nil, store, nil, nil),
)})
defer clean()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 37d8b6e61..f094d9ad4 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -168,7 +168,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry)
- srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil)
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 030ba8d89..ebeb2b071 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -176,7 +176,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
NodeSetFromNodeManager(opt.withNodeMgr),
)
- prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil)
+ prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, opt.withRepoStore, nil, nil, nil)
listener, port := listenAvailPort(t)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 0b2aef714..82e7bfc57 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -363,7 +363,7 @@ func TestPropagateReplicationJob(t *testing.T) {
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil)
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil)
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 516850f59..5eea4bd84 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -101,6 +101,8 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
nil,
rs,
nil,
+ nil,
+ nil,
)
defer srv.Stop()
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 0ad23f242..41e11fb33 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -23,6 +23,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/transaction"
@@ -47,7 +48,9 @@ func NewGRPCServer(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
- rfs info.ReplicationFactorSetter,
+ assignmentStore AssignmentStore,
+ conns Connections,
+ primaryGetter PrimaryGetter,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
ctxTagOpts := []grpc_ctxtags.Option{
@@ -103,7 +106,7 @@ func NewGRPCServer(
warnDupeAddrs(logger, conf)
srv := grpc.NewServer(grpcOpts...)
- registerServices(srv, nodeMgr, txMgr, conf, queue, rs, rfs)
+ registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter)
return srv
}
@@ -115,10 +118,20 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
// registerServices registers services praefect needs to handle RPCs on its own.
-func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs info.ReplicationFactorSetter) {
+func registerServices(
+ srv *grpc.Server,
+ nm nodes.Manager,
+ tm *transactions.Manager,
+ conf config.Config,
+ queue datastore.ReplicationEventQueue,
+ rs datastore.RepositoryStore,
+ assignmentStore AssignmentStore,
+ conns service.Connections,
+ primaryGetter info.PrimaryGetter,
+) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, rfs))
+ gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, rs, assignmentStore, conns, primaryGetter))
gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv, health.NewServer())
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index b8b42cd4f..3bf494556 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -27,19 +26,23 @@ func NewServerFactory(
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
- rfs info.ReplicationFactorSetter,
+ assignmentStore AssignmentStore,
registry *protoregistry.Registry,
+ conns Connections,
+ primaryGetter PrimaryGetter,
) *ServerFactory {
return &ServerFactory{
- conf: conf,
- logger: logger,
- director: director,
- nodeMgr: nodeMgr,
- txMgr: txMgr,
- queue: queue,
- rs: rs,
- rfs: rfs,
- registry: registry,
+ conf: conf,
+ logger: logger,
+ director: director,
+ nodeMgr: nodeMgr,
+ txMgr: txMgr,
+ queue: queue,
+ rs: rs,
+ assignmentStore: assignmentStore,
+ registry: registry,
+ conns: conns,
+ primaryGetter: primaryGetter,
}
}
@@ -53,9 +56,11 @@ type ServerFactory struct {
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
rs datastore.RepositoryStore
- rfs info.ReplicationFactorSetter
+ assignmentStore AssignmentStore
registry *protoregistry.Registry
secure, insecure []*grpc.Server
+ conns Connections
+ primaryGetter PrimaryGetter
}
// Serve starts serving on the provided listener with newly created grpc.Server
@@ -124,7 +129,9 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
s.txMgr,
s.queue,
s.rs,
- s.rfs,
+ s.assignmentStore,
+ s.conns,
+ s.primaryGetter,
grpcOpts...,
)
}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 312d542c8..2ab625909 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -133,7 +133,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -162,7 +162,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -204,7 +204,7 @@ func TestServerFactory(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
// start with tcp address
@@ -272,7 +272,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -281,7 +281,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
diff --git a/internal/praefect/service/connections.go b/internal/praefect/service/connections.go
new file mode 100644
index 000000000..38d72b73f
--- /dev/null
+++ b/internal/praefect/service/connections.go
@@ -0,0 +1,9 @@
+package service
+
+import "google.golang.org/grpc"
+
+// Connections contains connections to Gitaly nodes keyed by virtual storage and storage
+//
+// This duplicates the praefect.Connections type as it is not possible to import anything from `praefect`
+// to `info` or `server` packages due to cyclic dependencies.
+type Connections map[string]map[string]*grpc.ClientConn
diff --git a/internal/praefect/service/info/consistencycheck_test.go b/internal/praefect/service/info/consistencycheck_test.go
index 2a77e7d0f..8d2c34dee 100644
--- a/internal/praefect/service/info/consistencycheck_test.go
+++ b/internal/praefect/service/info/consistencycheck_test.go
@@ -35,7 +35,7 @@ func TestServer_ConsistencyCheck_repositorySpecificPrimariesUnsupported(t *testi
NewServer(nil,
config.Config{
Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
- }, nil, nil, nil,
+ }, nil, nil, nil, nil, nil,
).ConsistencyCheck(nil, nil),
)
}
@@ -148,7 +148,7 @@ func TestServer_ConsistencyCheck(t *testing.T) {
praefectSrv := grpc.NewServer()
defer praefectSrv.Stop()
- gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil))
+ gitalypb.RegisterPraefectInfoServiceServer(praefectSrv, NewServer(nm, conf, queue, nil, nil, nil, nil))
go praefectSrv.Serve(praefectListener)
praefectConn, err := client.Dial("unix://"+praefectAddr, nil)
diff --git a/internal/praefect/service/info/replication_factor.go b/internal/praefect/service/info/replication_factor.go
index 622c76a83..b3fcf4e8f 100644
--- a/internal/praefect/service/info/replication_factor.go
+++ b/internal/praefect/service/info/replication_factor.go
@@ -10,13 +10,6 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-// ReplicationFactorSetter sets a repository's replication factor
-type ReplicationFactorSetter interface {
- // SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met.
- // Please see the protobuf documentation of the method for details.
- SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
-}
-
func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) {
resp, err := s.setReplicationFactor(ctx, req)
if err != nil {
@@ -32,7 +25,7 @@ func (s *Server) SetReplicationFactor(ctx context.Context, req *gitalypb.SetRepl
}
func (s *Server) setReplicationFactor(ctx context.Context, req *gitalypb.SetReplicationFactorRequest) (*gitalypb.SetReplicationFactorResponse, error) {
- storages, err := s.rfs.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor))
+ storages, err := s.assignmentStore.SetReplicationFactor(ctx, req.VirtualStorage, req.RelativePath, int(req.ReplicationFactor))
if err != nil {
return nil, fmt.Errorf("set replication factor: %w", err)
}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 9cb67efa3..e54ea267a 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -7,27 +7,60 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+// AssignmentStore is an interface for getting repository host node assignments.
+//
+// This duplicates the praefect.AssignmentGetter type as it is not possible to import anything from
+// `praefect` to `info` packages due to cyclic dependencies.
+type AssignmentStore interface {
+ // GetHostAssignments returns the names of the storages assigned to host the repository.
+ // The primary node must always be assigned.
+ GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)
+ // SetReplicationFactor sets a repository's replication factor and returns the current assignments.
+ SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
+}
+
+// PrimaryGetter is an interface for getting a primary of a repository.
+//
+// This duplicates the praefect.PrimaryGetter type as it is not possible to import anything from
+// `praefect` to `info` packages due to cyclic dependencies.
+type PrimaryGetter interface {
+ // GetPrimary returns the primary storage for a given repository.
+ GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error)
+}
+
// Server is a InfoService server
type Server struct {
- nodeMgr nodes.Manager
- conf config.Config
- queue datastore.ReplicationEventQueue
- rs datastore.RepositoryStore
-
- rfs ReplicationFactorSetter
+ nodeMgr nodes.Manager
+ conf config.Config
+ queue datastore.ReplicationEventQueue
+ rs datastore.RepositoryStore
+ assignmentStore AssignmentStore
+ conns service.Connections
+ primaryGetter PrimaryGetter
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, rfs ReplicationFactorSetter) gitalypb.PraefectInfoServiceServer {
+func NewServer(
+ nodeMgr nodes.Manager,
+ conf config.Config,
+ queue datastore.ReplicationEventQueue,
+ rs datastore.RepositoryStore,
+ assignmentStore AssignmentStore,
+ conns service.Connections,
+ primaryGetter PrimaryGetter,
+) gitalypb.PraefectInfoServiceServer {
return &Server{
- nodeMgr: nodeMgr,
- conf: conf,
- queue: queue,
- rs: rs,
- rfs: rfs,
+ nodeMgr: nodeMgr,
+ conf: conf,
+ queue: queue,
+ rs: rs,
+ assignmentStore: assignmentStore,
+ conns: conns,
+ primaryGetter: primaryGetter,
}
}