diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-10 10:36:03 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-08-10 10:36:03 +0300 |
commit | 1189be1011fab3d06e36e888ed0ef4b836030e92 (patch) | |
tree | 31c605f0aeda8314dbcd366f7105b560a5041c3f | |
parent | da1d3e9753186a08e80fa473fcee192fc3534431 (diff) | |
parent | ef99157c983fc0d457ef65043302e6a03b07c1db (diff) |
Merge branch 'jt-intercept-replicate-repository' into 'master'
praefect: Disable object pool replication for `ReplicateRepository`
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6160
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Justin Tobler <jtobler@gitlab.com>
-rw-r--r-- | internal/cli/praefect/serve.go | 1 | ||||
-rw-r--r-- | internal/featureflag/ff_intercept_replicate_repository.go | 10 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 11 | ||||
-rw-r--r-- | internal/grpc/proxy/handler.go | 42 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/get_object_pool_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/remove_all_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/remove_repository_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicate_repository.go | 58 | ||||
-rw-r--r-- | internal/praefect/replicate_repository_test.go | 209 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/server.go | 10 | ||||
-rw-r--r-- | internal/praefect/server_factory.go | 4 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/testserver.go | 1 |
15 files changed, 333 insertions, 29 deletions
diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go index a36faad2f..3a5e1ea3f 100644 --- a/internal/cli/praefect/serve.go +++ b/internal/cli/praefect/serve.go @@ -396,6 +396,7 @@ func server( srvFactory = praefect.NewServerFactory( conf, logger, + coordinator, coordinator.StreamDirector, nodeManager, transactionManager, diff --git a/internal/featureflag/ff_intercept_replicate_repository.go b/internal/featureflag/ff_intercept_replicate_repository.go new file mode 100644 index 000000000..175ba5ae3 --- /dev/null +++ b/internal/featureflag/ff_intercept_replicate_repository.go @@ -0,0 +1,10 @@ +package featureflag + +// InterceptReplicateRepository will enable the interception of the ReplicateRepository RPC in +// Praefect. Interception of this RPC enables Praefect to support object pool replication. +var InterceptReplicateRepository = NewFeatureFlag( + "intercept_replicate_repository", + "v16.3.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/5477", + false, +) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 152b53196..2762fd3ef 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -43,7 +43,8 @@ import ( func TestReplicateRepository(t *testing.T) { t.Parallel() - testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool).Run(t, testReplicateRepository) + testhelper.NewFeatureSets(featureflag.ReplicateRepositoryObjectPool, featureflag.InterceptReplicateRepository). + Run(t, testReplicateRepository) } func testReplicateRepository(t *testing.T, ctx context.Context) { @@ -383,6 +384,10 @@ func testReplicateRepository(t *testing.T, ctx context.Context) { }, { desc: "only target repository is linked to object pool", + // Object pool replication is not currently supported by Praefect. Existing object pool + // repositories cannot be located because Praefect rewrites repository messages. + // Consequently, this test case is executed with Praefect disabled. + serverOpts: []testserver.GitalyServerOpt{testserver.WithDisablePraefect()}, setup: func(t *testing.T, cfg config.Cfg) setupData { sourceProto, _, targetProto, targetPath := setupSourceAndTarget(t, cfg, true) @@ -441,6 +446,10 @@ func testReplicateRepository(t *testing.T, ctx context.Context) { }, { desc: "source and target linked to different object pool", + // Object pool replication is not currently supported by Praefect. Existing object pool + // repositories cannot be located because Praefect rewrites repository messages. + // Consequently, this test case is executed with Praefect disabled. + serverOpts: []testserver.GitalyServerOpt{testserver.WithDisablePraefect()}, setup: func(t *testing.T, cfg config.Cfg) setupData { sourceProto, _, targetProto, targetPath := setupSourceAndTarget(t, cfg, true) diff --git a/internal/grpc/proxy/handler.go b/internal/grpc/proxy/handler.go index eccaf00f5..e43e86292 100644 --- a/internal/grpc/proxy/handler.go +++ b/internal/grpc/proxy/handler.go @@ -106,24 +106,9 @@ func failDestinationsWithError(params *StreamParameters, err error) { } } -// handler is where the real magic of proxying happens. -// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, -// forwarding it to a ClientStream established against the relevant ClientConn. -func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (finalErr error) { - // little bit of gRPC internals never hurt anyone - fullMethodName, ok := grpc.MethodFromServerStream(serverStream) - if !ok { - return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") - } - - peeker := newPeeker(serverStream) - - // We require that the director's returned context inherits from the serverStream.Context(). - params, err := s.director(serverStream.Context(), fullMethodName, peeker) - if err != nil { - return err - } - +// HandleStream proxies the RPC stream to the destination storages. Only responses from the primary +// are sent back to the client. +func HandleStream(serverStream grpc.ServerStream, fullMethodName string, params *StreamParameters) (finalErr error) { defer func() { err := params.RequestFinalizer() if finalErr == nil { @@ -228,6 +213,27 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina return nil } +// handler is where the real magic of proxying happens. +// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, +// forwarding it to a ClientStream established against the relevant ClientConn. +func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { + // little bit of gRPC internals never hurt anyone + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if !ok { + return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + } + + peeker := newPeeker(serverStream) + + // We require that the director's returned context inherits from the serverStream.Context(). + params, err := s.director(serverStream.Context(), fullMethodName, peeker) + if err != nil { + return err + } + + return HandleStream(serverStream, fullMethodName, params) +} + func cancelStreams(streams []streamAndDestination) { for _, stream := range streams { stream.cancel() diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index a4d07d036..c35881da0 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -159,7 +159,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) - srv := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil, nil, nil) + srv := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil, nil, nil) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) diff --git a/internal/praefect/get_object_pool_test.go b/internal/praefect/get_object_pool_test.go index 859bb10f1..2461d0f36 100644 --- a/internal/praefect/get_object_pool_test.go +++ b/internal/praefect/get_object_pool_test.go @@ -74,6 +74,7 @@ func TestGetObjectPoolHandler(t *testing.T) { config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, testhelper.NewDiscardingLogEntry(t), protoregistry.GitalyProtoPreregistered, + nil, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, errServedByGitaly }, diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go index e875b68ae..1a3350914 100644 --- a/internal/praefect/remove_all_test.go +++ b/internal/praefect/remove_all_test.go @@ -71,6 +71,7 @@ func TestRemoveAllHandler(t *testing.T) { config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, testhelper.NewDiscardingLogEntry(t), protoregistry.GitalyProtoPreregistered, + nil, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, errServedByGitaly }, diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go index b07f9d28b..fdafa15c0 100644 --- a/internal/praefect/remove_repository_test.go +++ b/internal/praefect/remove_repository_test.go @@ -108,6 +108,7 @@ func TestRemoveRepositoryHandler(t *testing.T) { config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}}, testhelper.NewDiscardingLogEntry(t), protoregistry.GitalyProtoPreregistered, + nil, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, errServedByGitaly }, diff --git a/internal/praefect/replicate_repository.go b/internal/praefect/replicate_repository.go new file mode 100644 index 000000000..513e1060c --- /dev/null +++ b/internal/praefect/replicate_repository.go @@ -0,0 +1,58 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" +) + +// ReplicateRepositoryHandler intercepts ReplicateRepository RPC calls. +func ReplicateRepositoryHandler(coordinator *Coordinator) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { + ctx := stream.Context() + + if featureflag.InterceptReplicateRepository.IsDisabled(ctx) { + // Fallback to the default transparent handler to proxy the RPC. + return proxy.TransparentHandler(coordinator.StreamDirector)(srv, stream) + } + + // Peek the stream to get first request. + var req gitalypb.ReplicateRepositoryRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + // Validate required target repository information is present in first request. + if err := coordinator.validateTargetRepo(req.GetRepository()); err != nil { + return structerr.NewInvalidArgument("%w", err) + } + + const fullMethodName = "/gitaly.RepositoryService/ReplicateRepository" + mi, err := coordinator.registry.LookupMethod(fullMethodName) + if err != nil { + return err + } + + // Object pool replication is not yet supported by Praefect. Rewrite the request to always + // disable object pool replication. + req.ReplicateObjectDeduplicationNetworkMembership = false + + // Generate stream parameters used to configure the stream proxy. + parameters, err := coordinator.mutatorStreamParameters(ctx, grpcCall{ + fullMethodName: fullMethodName, + methodInfo: mi, + msg: &req, + targetRepo: req.GetRepository(), + }) + if err != nil { + return err + } + + // Proxy stream to destination storages. + return proxy.HandleStream(stream, fullMethodName, parameters) + } +} diff --git a/internal/praefect/replicate_repository_test.go b/internal/praefect/replicate_repository_test.go new file mode 100644 index 000000000..a46e3b397 --- /dev/null +++ b/internal/praefect/replicate_repository_test.go @@ -0,0 +1,209 @@ +package praefect + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestReplicateRepositoryHandler(t *testing.T) { + t.Parallel() + testhelper.NewFeatureSets(featureflag.InterceptReplicateRepository, featureflag.ReplicateRepositoryObjectPool). + Run(t, testReplicateRepositoryHandler) +} + +func testReplicateRepositoryHandler(t *testing.T, ctx context.Context) { + // Standalone Gitaly node for hosting source repository being replicated from. + const sourceStorage = "gitaly-source" + sourceCfg := testcfg.Build(t, testcfg.WithStorages(sourceStorage)) + sourceAddr := testserver.RunGitalyServer(t, sourceCfg, setup.RegisterAll, testserver.WithDisablePraefect()) + sourceCfg.SocketPath = sourceAddr + + // Gitaly node deployed behind Praefect for hosting target repository being replicated to. + const targetStorage = "gitaly-target" + targetCfg := testcfg.Build(t, testcfg.WithStorages(targetStorage)) + testcfg.BuildGitalyHooks(t, targetCfg) + testcfg.BuildGitalySSH(t, targetCfg) + targetAddr := testserver.RunGitalyServer(t, targetCfg, setup.RegisterAll, testserver.WithDisablePraefect()) + targetCfg.SocketPath = targetAddr + + // Praefect proxy which ReplicateRepository RPC routes through. + const virtualStorage = "virtual-storage" + praefectCfg := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + DB: testdb.GetConfig(t, testdb.New(t).Name), + Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, + Replication: config.DefaultReplicationConfig(), + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: targetStorage, Address: targetAddr}, + }, + }, + }, + } + praefectServer := testserver.StartPraefect(t, praefectCfg) + + clientConn, err := grpc.DialContext(ctx, praefectServer.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { + testhelper.MustClose(t, clientConn) + }) + + repoClient := gitalypb.NewRepositoryServiceClient(clientConn) + objectPoolClient := gitalypb.NewObjectPoolServiceClient(clientConn) + + type setupData struct { + source *gitalypb.Repository + target *gitalypb.Repository + expectedObjectPool *gitalypb.ObjectPool + expectedErr error + } + + for _, tc := range []struct { + desc string + setup func(t *testing.T) setupData + }{ + { + desc: "source repository is not linked to object pool", + setup: func(t *testing.T) setupData { + // If the source repository is not linked to an object pool, there is no need to + // perform object pool replication. + repoProto, _ := gittest.CreateRepository(t, ctx, sourceCfg) + + return setupData{ + source: repoProto, + target: &gitalypb.Repository{ + StorageName: virtualStorage, + RelativePath: gittest.NewRepositoryName(t), + }, + expectedObjectPool: nil, + } + }, + }, + { + desc: "source repository is linked to object pool", + setup: func(t *testing.T) setupData { + // If the source repository is linked to an object pool and the target storage is + // Praefect, the ReplicateRepository RPC handler should intercept and disable object + // pool replication. + sourceProto, _ := gittest.CreateRepository(t, ctx, sourceCfg) + poolProto, _ := gittest.CreateObjectPool(t, ctx, sourceCfg, sourceProto, gittest.CreateObjectPoolConfig{ + LinkRepositoryToObjectPool: true, + }) + + target := &gitalypb.Repository{ + StorageName: virtualStorage, + RelativePath: gittest.NewRepositoryName(t), + } + + // If the RPC is not intercepted, the default behavior RPC handler is invoked. When + // object pool replication is also enabled this will lead to an object pool being + // created on the target storage. This is problematic because Praefect is not aware + // of the object pool. + if featureflag.InterceptReplicateRepository.IsDisabled(ctx) && featureflag.ReplicateRepositoryObjectPool.IsEnabled(ctx) { + return setupData{ + source: sourceProto, + target: target, + expectedObjectPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: virtualStorage, + RelativePath: poolProto.Repository.RelativePath, + }, + }, + } + } + + return setupData{ + source: sourceProto, + target: target, + expectedObjectPool: nil, + } + }, + }, + { + desc: "repository not set", + setup: func(t *testing.T) setupData { + return setupData{ + target: nil, + expectedErr: structerr.NewInvalidArgument("repository not set"), + } + }, + }, + { + desc: "storage not set", + setup: func(t *testing.T) setupData { + return setupData{ + target: &gitalypb.Repository{ + StorageName: "", + RelativePath: gittest.NewRepositoryName(t), + }, + expectedErr: structerr.NewInvalidArgument("storage name not set"), + } + }, + }, + { + desc: "relative path not set", + setup: func(t *testing.T) setupData { + return setupData{ + target: &gitalypb.Repository{ + StorageName: virtualStorage, + RelativePath: "", + }, + expectedErr: structerr.NewInvalidArgument("repository path not set"), + } + }, + }, + { + desc: "storage and relative path not set", + setup: func(t *testing.T) setupData { + return setupData{ + target: &gitalypb.Repository{ + StorageName: "", + RelativePath: "", + }, + expectedErr: structerr.NewInvalidArgument("repository not set"), + } + }, + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + setup := tc.setup(t) + + ctx := testhelper.MergeOutgoingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, sourceCfg)) + _, err := repoClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ + Repository: setup.target, + Source: setup.source, + // Enable object pool replication to validate if Praefect is able to rewrite the + // message to disable it. + ReplicateObjectDeduplicationNetworkMembership: true, + }) + testhelper.RequireGrpcError(t, setup.expectedErr, err) + if setup.expectedErr != nil { + return + } + + // Check if Praefect disabled object pool replication. + resp, err := objectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{Repository: setup.target}) + require.NoError(t, err) + require.Equal(t, setup.expectedObjectPool, resp.GetObjectPool()) + }) + } +} diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 26b1d265c..53f499769 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -89,6 +89,7 @@ func TestRepositoryExistsHandler(t *testing.T) { config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}}, testhelper.NewDiscardingLogEntry(t), protoregistry.GitalyProtoPreregistered, + nil, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, errServedByGitaly }, diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 2bcc2bd07..402ca03ea 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -119,6 +119,7 @@ func NewGRPCServer( conf config.Config, logger *logrus.Entry, registry *protoregistry.Registry, + coordinator *Coordinator, director proxy.StreamDirector, txMgr *transactions.Manager, rs datastore.RepositoryStore, @@ -205,10 +206,11 @@ func NewGRPCServer( if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ - "RemoveAll": RemoveAllHandler(rs, conns), - "RemoveRepository": RemoveRepositoryHandler(rs, conns), - "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs), - "RepositoryExists": RepositoryExistsHandler(rs), + "RemoveAll": RemoveAllHandler(rs, conns), + "RemoveRepository": RemoveRepositoryHandler(rs, conns), + "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs), + "ReplicateRepository": ReplicateRepositoryHandler(coordinator), + "RepositoryExists": RepositoryExistsHandler(rs), }) proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go index 27faf9257..67bcf89b7 100644 --- a/internal/praefect/server_factory.go +++ b/internal/praefect/server_factory.go @@ -22,6 +22,7 @@ import ( func NewServerFactory( conf config.Config, logger *logrus.Entry, + coordinator *Coordinator, director proxy.StreamDirector, nodeMgr nodes.Manager, txMgr *transactions.Manager, @@ -38,6 +39,7 @@ func NewServerFactory( return &ServerFactory{ conf: conf, logger: logger, + coordinator: coordinator, director: director, nodeMgr: nodeMgr, txMgr: txMgr, @@ -58,6 +60,7 @@ type ServerFactory struct { mtx sync.Mutex conf config.Config logger *logrus.Entry + coordinator *Coordinator director proxy.StreamDirector nodeMgr nodes.Manager txMgr *transactions.Manager @@ -142,6 +145,7 @@ func (s *ServerFactory) createGRPC(creds credentials.TransportCredentials) *grpc s.conf, s.logger, s.registry, + s.coordinator, s.director, s.txMgr, s.rs, diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 9700998cd..0c1608cc8 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -178,7 +178,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -210,7 +210,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -238,7 +238,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("stops all listening servers", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) defer praefectServerFactory.Stop() var healthClients []healthpb.HealthClient @@ -294,7 +294,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -303,7 +303,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, router, registry, nil, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go index a32c64192..377fc9eb1 100644 --- a/internal/praefect/testserver.go +++ b/internal/praefect/testserver.go @@ -251,6 +251,7 @@ func RunPraefectServer( conf, opt.WithLogger, protoregistry.GitalyProtoPreregistered, + coordinator, coordinator.StreamDirector, opt.WithTxMgr, opt.WithRepoStore, |