Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-10 10:36:03 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-08-10 10:36:03 +0300
commit1189be1011fab3d06e36e888ed0ef4b836030e92 (patch)
tree31c605f0aeda8314dbcd366f7105b560a5041c3f
parentda1d3e9753186a08e80fa473fcee192fc3534431 (diff)
parentef99157c983fc0d457ef65043302e6a03b07c1db (diff)
Merge branch 'jt-intercept-replicate-repository' into 'master'
praefect: Disable object pool replication for `ReplicateRepository` See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6160 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r--internal/cli/praefect/serve.go1
-rw-r--r--internal/featureflag/ff_intercept_replicate_repository.go10
-rw-r--r--internal/gitaly/service/repository/replicate_test.go11
-rw-r--r--internal/grpc/proxy/handler.go42
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/get_object_pool_test.go1
-rw-r--r--internal/praefect/remove_all_test.go1
-rw-r--r--internal/praefect/remove_repository_test.go1
-rw-r--r--internal/praefect/replicate_repository.go58
-rw-r--r--internal/praefect/replicate_repository_test.go209
-rw-r--r--internal/praefect/repository_exists_test.go1
-rw-r--r--internal/praefect/server.go10
-rw-r--r--internal/praefect/server_factory.go4
-rw-r--r--internal/praefect/server_factory_test.go10
-rw-r--r--internal/praefect/testserver.go1
15 files changed, 333 insertions, 29 deletions
diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go
index a36faad2f..3a5e1ea3f 100644
--- a/internal/cli/praefect/serve.go
+++ b/internal/cli/praefect/serve.go
@@ -396,6 +396,7 @@ func server(
srvFactory = praefect.NewServerFactory(
conf,
logger,
+ coordinator,
coordinator.StreamDirector,
nodeManager,
transactionManager,
diff --git a/internal/featureflag/ff_intercept_replicate_repository.go b/internal/featureflag/ff_intercept_replicate_repository.go
new file mode 100644
index 000000000..175ba5ae3
--- /dev/null
+++ b/internal/featureflag/ff_intercept_replicate_repository.go
@@ -0,0 +1,10 @@
+package featureflag
+
+// InterceptReplicateRepository will enable the interception of the ReplicateRepository RPC in
+// Praefect. Interception of this RPC enables Praefect to support object pool replication.
+var InterceptReplicateRepository = NewFeatureFlag(
+ "intercept_replicate_repository",
+ "v16.3.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/5477",
+ false,
+)
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 152b53196..2762fd3ef 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -43,7 +43,8 @@ import (
func TestReplicateRepository(t *testing.T) {
t.Parallel()
- testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool).Run(t, testReplicateRepository)
+ testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool, featureflag.InterceptReplicateRepository).
+ Run(t, testReplicateRepository)
}
func testReplicateRepository(t *testing.T, ctx context.Context) {
@@ -383,6 +384,10 @@ func testReplicateRepository(t *testing.T, ctx context.Context) {
},
{
desc: "only target repository is linked to object pool",
+ // Object pool replication is not currently supported by Praefect. Existing object pool
+ // repositories cannot be located because Praefect rewrites repository messages.
+ // Consequently, this test case is executed with Praefect disabled.
+ serverOpts: []testserver.GitalyServerOpt{testserver.WithDisablePraefect()},
setup: func(t *testing.T, cfg config.Cfg) setupData {
sourceProto, _, targetProto, targetPath := setupSourceAndTarget(t, cfg, true)
@@ -441,6 +446,10 @@ func testReplicateRepository(t *testing.T, ctx context.Context) {
},
{
desc: "source and target linked to different object pool",
+ // Object pool replication is not currently supported by Praefect. Existing object pool
+ // repositories cannot be located because Praefect rewrites repository messages.
+ // Consequently, this test case is executed with Praefect disabled.
+ serverOpts: []testserver.GitalyServerOpt{testserver.WithDisablePraefect()},
setup: func(t *testing.T, cfg config.Cfg) setupData {
sourceProto, _, targetProto, targetPath := setupSourceAndTarget(t, cfg, true)
diff --git a/internal/grpc/proxy/handler.go b/internal/grpc/proxy/handler.go
index eccaf00f5..e43e86292 100644
--- a/internal/grpc/proxy/handler.go
+++ b/internal/grpc/proxy/handler.go
@@ -106,24 +106,9 @@ func failDestinationsWithError(params *StreamParameters, err error) {
}
}
-// handler is where the real magic of proxying happens.
-// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
-// forwarding it to a ClientStream established against the relevant ClientConn.
-func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (finalErr error) {
- // little bit of gRPC internals never hurt anyone
- fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
- if !ok {
- return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
- }
-
- peeker := newPeeker(serverStream)
-
- // We require that the director's returned context inherits from the serverStream.Context().
- params, err := s.director(serverStream.Context(), fullMethodName, peeker)
- if err != nil {
- return err
- }
-
+// HandleStream proxies the RPC stream to the destination storages. Only responses from the primary
+// are sent back to the client.
+func HandleStream(serverStream grpc.ServerStream, fullMethodName string, params *StreamParameters) (finalErr error) {
defer func() {
err := params.RequestFinalizer()
if finalErr == nil {
@@ -228,6 +213,27 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
return nil
}
+// handler is where the real magic of proxying happens.
+// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
+// forwarding it to a ClientStream established against the relevant ClientConn.
+func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
+ // little bit of gRPC internals never hurt anyone
+ fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
+ if !ok {
+ return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
+ }
+
+ peeker := newPeeker(serverStream)
+
+ // We require that the director's returned context inherits from the serverStream.Context().
+ params, err := s.director(serverStream.Context(), fullMethodName, peeker)
+ if err != nil {
+ return err
+ }
+
+ return HandleStream(serverStream, fullMethodName, params)
+}
+
func cancelStreams(streams []streamAndDestination) {
for _, stream := range streams {
stream.cancel()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index a4d07d036..c35881da0 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -159,7 +159,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered)
- srv := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil, nil, nil)
+ srv := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
diff --git a/internal/praefect/get_object_pool_test.go b/internal/praefect/get_object_pool_test.go
index 859bb10f1..2461d0f36 100644
--- a/internal/praefect/get_object_pool_test.go
+++ b/internal/praefect/get_object_pool_test.go
@@ -74,6 +74,7 @@ func TestGetObjectPoolHandler(t *testing.T) {
config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
testhelper.NewDiscardingLogEntry(t),
protoregistry.GitalyProtoPreregistered,
+ nil,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
return nil, errServedByGitaly
},
diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go
index e875b68ae..1a3350914 100644
--- a/internal/praefect/remove_all_test.go
+++ b/internal/praefect/remove_all_test.go
@@ -71,6 +71,7 @@ func TestRemoveAllHandler(t *testing.T) {
config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
testhelper.NewDiscardingLogEntry(t),
protoregistry.GitalyProtoPreregistered,
+ nil,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
return nil, errServedByGitaly
},
diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go
index b07f9d28b..fdafa15c0 100644
--- a/internal/praefect/remove_repository_test.go
+++ b/internal/praefect/remove_repository_test.go
@@ -108,6 +108,7 @@ func TestRemoveRepositoryHandler(t *testing.T) {
config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}},
testhelper.NewDiscardingLogEntry(t),
protoregistry.GitalyProtoPreregistered,
+ nil,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
return nil, errServedByGitaly
},
diff --git a/internal/praefect/replicate_repository.go b/internal/praefect/replicate_repository.go
new file mode 100644
index 000000000..513e1060c
--- /dev/null
+++ b/internal/praefect/replicate_repository.go
@@ -0,0 +1,58 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// ReplicateRepositoryHandler intercepts ReplicateRepository RPC calls.
+func ReplicateRepositoryHandler(coordinator *Coordinator) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ ctx := stream.Context()
+
+ if featureflag.InterceptReplicateRepository.IsDisabled(ctx) {
+ // Fallback to the default transparent handler to proxy the RPC.
+ return proxy.TransparentHandler(coordinator.StreamDirector)(srv, stream)
+ }
+
+ // Peek the stream to get first request.
+ var req gitalypb.ReplicateRepositoryRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ // Validate required target repository information is present in first request.
+ if err := coordinator.validateTargetRepo(req.GetRepository()); err != nil {
+ return structerr.NewInvalidArgument("%w", err)
+ }
+
+ const fullMethodName = "/gitaly.RepositoryService/ReplicateRepository"
+ mi, err := coordinator.registry.LookupMethod(fullMethodName)
+ if err != nil {
+ return err
+ }
+
+ // Object pool replication is not yet supported by Praefect. Rewrite the request to always
+ // disable object pool replication.
+ req.ReplicateObjectDeduplicationNetworkMembership = false
+
+ // Generate stream parameters used to configure the stream proxy.
+ parameters, err := coordinator.mutatorStreamParameters(ctx, grpcCall{
+ fullMethodName: fullMethodName,
+ methodInfo: mi,
+ msg: &req,
+ targetRepo: req.GetRepository(),
+ })
+ if err != nil {
+ return err
+ }
+
+ // Proxy stream to destination storages.
+ return proxy.HandleStream(stream, fullMethodName, parameters)
+ }
+}
diff --git a/internal/praefect/replicate_repository_test.go b/internal/praefect/replicate_repository_test.go
new file mode 100644
index 000000000..a46e3b397
--- /dev/null
+++ b/internal/praefect/replicate_repository_test.go
@@ -0,0 +1,209 @@
+package praefect
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestReplicateRepositoryHandler(t *testing.T) {
+ t.Parallel()
+ testhelper.NewFeatureSets(featureflag.InterceptReplicateRepository, featureflag.ReplicateRepositoryObjectPool).
+ Run(t, testReplicateRepositoryHandler)
+}
+
+func testReplicateRepositoryHandler(t *testing.T, ctx context.Context) {
+ // Standalone Gitaly node for hosting source repository being replicated from.
+ const sourceStorage = "gitaly-source"
+ sourceCfg := testcfg.Build(t, testcfg.WithStorages(sourceStorage))
+ sourceAddr := testserver.RunGitalyServer(t, sourceCfg, setup.RegisterAll, testserver.WithDisablePraefect())
+ sourceCfg.SocketPath = sourceAddr
+
+ // Gitaly node deployed behind Praefect for hosting target repository being replicated to.
+ const targetStorage = "gitaly-target"
+ targetCfg := testcfg.Build(t, testcfg.WithStorages(targetStorage))
+ testcfg.BuildGitalyHooks(t, targetCfg)
+ testcfg.BuildGitalySSH(t, targetCfg)
+ targetAddr := testserver.RunGitalyServer(t, targetCfg, setup.RegisterAll, testserver.WithDisablePraefect())
+ targetCfg.SocketPath = targetAddr
+
+ // Praefect proxy which ReplicateRepository RPC routes through.
+ const virtualStorage = "virtual-storage"
+ praefectCfg := config.Config{
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ DB: testdb.GetConfig(t, testdb.New(t).Name),
+ Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
+ Replication: config.DefaultReplicationConfig(),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: targetStorage, Address: targetAddr},
+ },
+ },
+ },
+ }
+ praefectServer := testserver.StartPraefect(t, praefectCfg)
+
+ clientConn, err := grpc.DialContext(ctx, praefectServer.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ testhelper.MustClose(t, clientConn)
+ })
+
+ repoClient := gitalypb.NewRepositoryServiceClient(clientConn)
+ objectPoolClient := gitalypb.NewObjectPoolServiceClient(clientConn)
+
+ type setupData struct {
+ source *gitalypb.Repository
+ target *gitalypb.Repository
+ expectedObjectPool *gitalypb.ObjectPool
+ expectedErr error
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T) setupData
+ }{
+ {
+ desc: "source repository is not linked to object pool",
+ setup: func(t *testing.T) setupData {
+ // If the source repository is not linked to an object pool, there is no need to
+ // perform object pool replication.
+ repoProto, _ := gittest.CreateRepository(t, ctx, sourceCfg)
+
+ return setupData{
+ source: repoProto,
+ target: &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: gittest.NewRepositoryName(t),
+ },
+ expectedObjectPool: nil,
+ }
+ },
+ },
+ {
+ desc: "source repository is linked to object pool",
+ setup: func(t *testing.T) setupData {
+ // If the source repository is linked to an object pool and the target storage is
+ // Praefect, the ReplicateRepository RPC handler should intercept and disable object
+ // pool replication.
+ sourceProto, _ := gittest.CreateRepository(t, ctx, sourceCfg)
+ poolProto, _ := gittest.CreateObjectPool(t, ctx, sourceCfg, sourceProto, gittest.CreateObjectPoolConfig{
+ LinkRepositoryToObjectPool: true,
+ })
+
+ target := &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ // If the RPC is not intercepted, the default behavior RPC handler is invoked. When
+ // object pool replication is also enabled this will lead to an object pool being
+ // created on the target storage. This is problematic because Praefect is not aware
+ // of the object pool.
+ if featureflag.InterceptReplicateRepository.IsDisabled(ctx) && featureflag.ReplicateRepositoryObjectPool.IsEnabled(ctx) {
+ return setupData{
+ source: sourceProto,
+ target: target,
+ expectedObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: poolProto.Repository.RelativePath,
+ },
+ },
+ }
+ }
+
+ return setupData{
+ source: sourceProto,
+ target: target,
+ expectedObjectPool: nil,
+ }
+ },
+ },
+ {
+ desc: "repository not set",
+ setup: func(t *testing.T) setupData {
+ return setupData{
+ target: nil,
+ expectedErr: structerr.NewInvalidArgument("repository not set"),
+ }
+ },
+ },
+ {
+ desc: "storage not set",
+ setup: func(t *testing.T) setupData {
+ return setupData{
+ target: &gitalypb.Repository{
+ StorageName: "",
+ RelativePath: gittest.NewRepositoryName(t),
+ },
+ expectedErr: structerr.NewInvalidArgument("storage name not set"),
+ }
+ },
+ },
+ {
+ desc: "relative path not set",
+ setup: func(t *testing.T) setupData {
+ return setupData{
+ target: &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: "",
+ },
+ expectedErr: structerr.NewInvalidArgument("repository path not set"),
+ }
+ },
+ },
+ {
+ desc: "storage and relative path not set",
+ setup: func(t *testing.T) setupData {
+ return setupData{
+ target: &gitalypb.Repository{
+ StorageName: "",
+ RelativePath: "",
+ },
+ expectedErr: structerr.NewInvalidArgument("repository not set"),
+ }
+ },
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ setup := tc.setup(t)
+
+ ctx := testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, sourceCfg))
+ _, err := repoClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
+ Repository: setup.target,
+ Source: setup.source,
+ // Enable object pool replication to validate if Praefect is able to rewrite the
+ // message to disable it.
+ ReplicateObjectDeduplicationNetworkMembership: true,
+ })
+ testhelper.RequireGrpcError(t, setup.expectedErr, err)
+ if setup.expectedErr != nil {
+ return
+ }
+
+ // Check if Praefect disabled object pool replication.
+ resp, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{Repository: setup.target})
+ require.NoError(t, err)
+ require.Equal(t, setup.expectedObjectPool, resp.GetObjectPool())
+ })
+ }
+}
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 26b1d265c..53f499769 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -89,6 +89,7 @@ func TestRepositoryExistsHandler(t *testing.T) {
config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}},
testhelper.NewDiscardingLogEntry(t),
protoregistry.GitalyProtoPreregistered,
+ nil,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
return nil, errServedByGitaly
},
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 2bcc2bd07..402ca03ea 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -119,6 +119,7 @@ func NewGRPCServer(
conf config.Config,
logger *logrus.Entry,
registry *protoregistry.Registry,
+ coordinator *Coordinator,
director proxy.StreamDirector,
txMgr *transactions.Manager,
rs datastore.RepositoryStore,
@@ -205,10 +206,11 @@ func NewGRPCServer(
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
- "RemoveAll": RemoveAllHandler(rs, conns),
- "RemoveRepository": RemoveRepositoryHandler(rs, conns),
- "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
- "RepositoryExists": RepositoryExistsHandler(rs),
+ "RemoveAll": RemoveAllHandler(rs, conns),
+ "RemoveRepository": RemoveRepositoryHandler(rs, conns),
+ "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
+ "ReplicateRepository": ReplicateRepositoryHandler(coordinator),
+ "RepositoryExists": RepositoryExistsHandler(rs),
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index 27faf9257..67bcf89b7 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -22,6 +22,7 @@ import (
func NewServerFactory(
conf config.Config,
logger *logrus.Entry,
+ coordinator *Coordinator,
director proxy.StreamDirector,
nodeMgr nodes.Manager,
txMgr *transactions.Manager,
@@ -38,6 +39,7 @@ func NewServerFactory(
return &ServerFactory{
conf: conf,
logger: logger,
+ coordinator: coordinator,
director: director,
nodeMgr: nodeMgr,
txMgr: txMgr,
@@ -58,6 +60,7 @@ type ServerFactory struct {
mtx sync.Mutex
conf config.Config
logger *logrus.Entry
+ coordinator *Coordinator
director proxy.StreamDirector
nodeMgr nodes.Manager
txMgr *transactions.Manager
@@ -142,6 +145,7 @@ func (s *ServerFactory) createGRPC(creds credentials.TransportCredentials) *grpc
s.conf,
s.logger,
s.registry,
+ s.coordinator,
s.director,
s.txMgr,
s.rs,
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 9700998cd..0c1608cc8 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -178,7 +178,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -210,7 +210,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, "localhost:0")
@@ -238,7 +238,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("stops all listening servers", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
defer praefectServerFactory.Stop()
var healthClients []healthpb.HealthClient
@@ -294,7 +294,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -303,7 +303,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go
index a32c64192..377fc9eb1 100644
--- a/internal/praefect/testserver.go
+++ b/internal/praefect/testserver.go
@@ -251,6 +251,7 @@ func RunPraefectServer(
conf,
opt.WithLogger,
protoregistry.GitalyProtoPreregistered,
+ coordinator,
coordinator.StreamDirector,
opt.WithTxMgr,
opt.WithRepoStore,