diff options
author | Will Chandler <wchandler@gitlab.com> | 2023-01-18 18:49:27 +0300 |
---|---|---|
committer | Will Chandler <wchandler@gitlab.com> | 2023-01-18 18:49:27 +0300 |
commit | 77470925231663b462bb498b5f59624dfcee29c0 (patch) | |
tree | 980ab2ac688cb874fab804e9925b513f6d45380c | |
parent | 39bb9d29c7fbc9248a351b011874b747ced03826 (diff) | |
parent | 931cebe924aa94a3bd68dff127b95cfb692e5574 (diff) |
Merge branch 'smh-remove-conn-ff' into 'master'
Use Yamux configuration in Praefect by default
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5215
Merged-by: Will Chandler <wchandler@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
26 files changed, 137 insertions, 388 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e19e8ac45..540d7e982 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -69,7 +69,6 @@ import ( "strings" "time" - "github.com/hashicorp/yamux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -80,7 +79,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/log" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" @@ -312,15 +310,15 @@ func run( transactionManager := transactions.NewManager(conf) sidechannelRegistry := sidechannel.NewRegistry() - newClientHandshaker := func(config func(*yamux.Config)) backchannel.ClientHandshaker { - return backchannel.NewClientHandshakerWithYamuxConfig( - logger, - praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), - config, - ) - } + backchannelCfg := backchannel.DefaultConfiguration() + backchannelCfg.AcceptBacklog = conf.Yamux.AcceptBacklog + backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes + clientHandshaker := backchannel.NewClientHandshaker( + logger, + praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), + backchannelCfg, + ) - clientHandshaker := newClientHandshaker(nil) assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames()) var ( nodeManager nodes.Manager @@ -330,73 +328,46 @@ func run( primaryGetter praefect.PrimaryGetter ) if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { - dialNodes := func(clientHandshaker backchannel.ClientHandshaker) (praefect.NodeSet, error) { - return praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) - } - - nodeSet, err = dialNodes(clientHandshaker) + nodeSet, err = praefect.DialNodes( + ctx, + conf.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + errTracker, + clientHandshaker, + sidechannelRegistry, + ) if err != nil { return fmt.Errorf("dial nodes: %w", err) } defer nodeSet.Close() - // Dial a second set of connections to each storage with a different Yamux config - // so we can use a feature flag to gradually test the new configuration. - nodeSetWithCustomConfig, err := dialNodes(newClientHandshaker(func(cfg *yamux.Config) { - cfg.AcceptBacklog = conf.Yamux.AcceptBacklog - cfg.MaxStreamWindowSize = conf.Yamux.MaximumStreamWindowSizeBytes - })) - if err != nil { - return fmt.Errorf("dial nodes reduced window: %w", err) - } - defer nodeSetWithCustomConfig.Close() - - runHealthManager := func(db glsql.Querier, nodeSet praefect.NodeSet) *nodes.HealthManager { - hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) - go func() { - if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { - logger.WithError(err).Error("health manager exited") - } - }() - - return hm - } - - nodeSetHealthManager := runHealthManager(db, nodeSet) - healthChecker = nodeSetHealthManager + healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) + go func() { + if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { + logger.WithError(err).Error("health manager exited") + } + }() - nodeSetWithCustomConfigHealthManager := runHealthManager(nil, nodeSetWithCustomConfig) + healthChecker = healthManager // Wait for the first health check to complete so the Praefect doesn't start serving RPC // before the router is ready with the health status of the nodes. - <-nodeSetHealthManager.Updated() - <-nodeSetWithCustomConfigHealthManager.Updated() + <-healthManager.Updated() elector := nodes.NewPerRepositoryElector(db) primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) - newPerRepositoryRouter := func(healthManager *nodes.HealthManager, nodeSet praefect.NodeSet) *praefect.PerRepositoryRouter { - return praefect.NewPerRepositoryRouter( - nodeSet.Connections(), - elector, - healthManager, - praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), - csg, - assignmentStore, - rs, - conf.DefaultReplicationFactors(), - ) - } - - // Configure a router that uses a feature flag to switch between two different sets of connections. - // This way we can toggle the feature flag to use a different set of connections for proxied - // RPC calls. - router = praefect.NewFeatureFlaggedRouter( - featureflag.PraefectUseYamuxConfigurationForGitaly, - newPerRepositoryRouter(nodeSetWithCustomConfigHealthManager, nodeSetWithCustomConfig), - newPerRepositoryRouter(nodeSetHealthManager, nodeSet), + router = praefect.NewPerRepositoryRouter( + nodeSet.Connections(), + elector, + healthManager, + praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), + csg, + assignmentStore, + rs, + conf.DefaultReplicationFactors(), ) if conf.BackgroundVerification.VerificationInterval > 0 { @@ -405,7 +376,7 @@ func run( logger, db, nodeSet.Connections(), - nodeSetHealthManager, + healthManager, conf.BackgroundVerification.VerificationInterval.Duration(), conf.BackgroundVerification.DeleteInvalidRecords, ) diff --git a/internal/backchannel/backchannel.go b/internal/backchannel/backchannel.go index a7da22f15..990e3e5fb 100644 --- a/internal/backchannel/backchannel.go +++ b/internal/backchannel/backchannel.go @@ -40,26 +40,17 @@ import ( var magicBytes = []byte("backchannel") // muxConfig returns a new config to use with the multiplexing session. -func muxConfig(logger io.Writer, extra func(*yamux.Config)) *yamux.Config { - cfg := yamux.DefaultConfig() - cfg.LogOutput = logger - // The server only accepts a single stream from the client, which is the client's gRPC stream. - // The backchannel server should only receive a single stream from the server. As such, we can - // limit maximum pending streams to 1 as there should never be more streams waiting. - cfg.AcceptBacklog = 1 +func muxConfig(logger io.Writer, cfg Configuration) *yamux.Config { + yamuxCfg := yamux.DefaultConfig() + yamuxCfg.LogOutput = logger // gRPC is already configured to send keep alives so we don't need yamux to do this for us. // gRPC is a better choice as it sends the keep alives also to non-multiplexed connections. - cfg.EnableKeepAlive = false - // MaxStreamWindowSize configures the maximum receive buffer size for each stream. The sender - // is allowed to send the configured amount of bytes without receiving an acknowledgement from the - // receiver. This is can have a big impact on throughput as the latency increases, as the sender - // can't proceed sending without receiving an acknowledgement back. - cfg.MaxStreamWindowSize = 16 * 1024 * 1024 + yamuxCfg.EnableKeepAlive = false + yamuxCfg.AcceptBacklog = cfg.AcceptBacklog + yamuxCfg.MaxStreamWindowSize = cfg.MaximumStreamWindowSizeBytes + yamuxCfg.StreamCloseTimeout = cfg.StreamCloseTimeout - if extra != nil { - extra(cfg) - } - return cfg + return yamuxCfg } // connCloser wraps a net.Conn and calls the provided close function instead when Close diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go index 32fec8879..e4d6da59c 100644 --- a/internal/backchannel/backchannel_example_test.go +++ b/internal/backchannel/backchannel_example_test.go @@ -117,7 +117,7 @@ func invokeWithMuxedClient(logger *logrus.Entry, address string) error { fmt.Println("Praefect responding via backchannel") return stream.SendMsg(&gitalypb.VoteTransactionResponse{}) })) - }) + }, backchannel.DefaultConfiguration()) return invokeWithOpts(address, grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials()))) } diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go index 534b63962..ee89405a5 100644 --- a/internal/backchannel/backchannel_test.go +++ b/internal/backchannel/backchannel_test.go @@ -121,7 +121,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) { }) return srv - }) + }, DefaultConfiguration()) <-start client, err := grpc.Dial(ln.Addr().String(), @@ -182,7 +182,7 @@ func TestHandshaker_idempotentClose(t *testing.T) { stopCalled++ }, } - }) + }, DefaultConfiguration()) closeServer := make(chan struct{}) serverClosed := make(chan struct{}) @@ -276,7 +276,7 @@ func Benchmark(b *testing.B) { opts := []grpc.DialOption{grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())} if tc.multiplexed { - clientHandshaker := NewClientHandshaker(newLogger(), func() Server { return grpc.NewServer() }) + clientHandshaker := NewClientHandshaker(newLogger(), func() Server { return grpc.NewServer() }, DefaultConfiguration()) opts = []grpc.DialOption{ grpc.WithBlock(), grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())), diff --git a/internal/backchannel/client.go b/internal/backchannel/client.go index 031cca58b..c4526b7cc 100644 --- a/internal/backchannel/client.go +++ b/internal/backchannel/client.go @@ -24,40 +24,55 @@ type Server interface { // a backchannel closes. type ServerFactory func() Server +// Configuration sets contains configuration for the backchannel's Yamux session. +type Configuration struct { + // MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams. + // Higher value can increase throughput at the cost of more memory usage. + MaximumStreamWindowSizeBytes uint32 + // AcceptBacklog sets the maximum number of stream openings in-flight before further openings + // block. + AcceptBacklog int + // StreamCloseTimeout is the maximum time that a stream will allowed to + // be in a half-closed state when `Close` is called before forcibly + // closing the connection. + StreamCloseTimeout time.Duration +} + +// DefaultConfiguration returns the default configuration. +func DefaultConfiguration() Configuration { + defaults := yamux.DefaultConfig() + return Configuration{ + MaximumStreamWindowSizeBytes: defaults.MaxStreamWindowSize, + AcceptBacklog: defaults.AcceptBacklog, + StreamCloseTimeout: defaults.StreamCloseTimeout, + } +} + // ClientHandshaker implements the client side handshake of the multiplexed connection. type ClientHandshaker struct { logger *logrus.Entry serverFactory ServerFactory - yamuxConfig func(*yamux.Config) + cfg Configuration } // NewClientHandshaker returns a new client side implementation of the backchannel. The provided // logger is used to log multiplexing errors. -func NewClientHandshaker(logger *logrus.Entry, serverFactory ServerFactory) ClientHandshaker { - return NewClientHandshakerWithYamuxConfig(logger, serverFactory, nil) -} - -// NewClientHandshakerWithYamuxConfig returns a new client side implementation of the -// backchannel. The provided logger is used to log multiplexing errors. -// For each connection that we accept, the yamuxConfig callback is -// invoked to allow yamux configuration overrides. If yamuxConfig is nil -// it is ignored. -func NewClientHandshakerWithYamuxConfig(logger *logrus.Entry, serverFactory ServerFactory, yamuxConfig func(*yamux.Config)) ClientHandshaker { - return ClientHandshaker{logger: logger, serverFactory: serverFactory, yamuxConfig: yamuxConfig} +func NewClientHandshaker(logger *logrus.Entry, serverFactory ServerFactory, cfg Configuration) ClientHandshaker { + return ClientHandshaker{logger: logger, serverFactory: serverFactory, cfg: cfg} } // ClientHandshake returns TransportCredentials that perform the client side multiplexing handshake and // start the backchannel Server on the established connections. The transport credentials are used to intiliaze the // connection prior to the multiplexing. func (ch ClientHandshaker) ClientHandshake(tc credentials.TransportCredentials) credentials.TransportCredentials { - return clientHandshake{TransportCredentials: tc, serverFactory: ch.serverFactory, logger: ch.logger, yamuxConfig: ch.yamuxConfig} + return clientHandshake{TransportCredentials: tc, serverFactory: ch.serverFactory, logger: ch.logger, cfg: ch.cfg} } type clientHandshake struct { credentials.TransportCredentials serverFactory ServerFactory logger *logrus.Entry - yamuxConfig func(*yamux.Config) + cfg Configuration } func (ch clientHandshake) ClientHandshake(ctx context.Context, serverName string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) { @@ -103,7 +118,7 @@ func (ch clientHandshake) serve(ctx context.Context, conn net.Conn) (net.Conn, e logger := ch.logger.WriterLevel(logrus.ErrorLevel) // Initiate the multiplexing session. - muxSession, err := yamux.Client(conn, muxConfig(logger, ch.yamuxConfig)) + muxSession, err := yamux.Client(conn, muxConfig(logger, ch.cfg)) if err != nil { logger.Close() return nil, fmt.Errorf("open multiplexing session: %w", err) diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go index 9b7e58bda..a6fe677ee 100644 --- a/internal/backchannel/server.go +++ b/internal/backchannel/server.go @@ -102,7 +102,14 @@ func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInf logger := s.logger.WriterLevel(logrus.ErrorLevel) // Open the server side of the multiplexing session. - muxSession, err := yamux.Server(conn, muxConfig(logger, nil)) + // + // Gitaly is using custom settings with a lower accept backlog and higher receive + // buffer size than Praefect and the clients. We should eventually strive to match + // the settings here to avoid Gitaly from buffering too much. + cfg := DefaultConfiguration() + cfg.AcceptBacklog = 1 + cfg.MaximumStreamWindowSizeBytes = 16 * 1024 * 1024 + muxSession, err := yamux.Server(conn, muxConfig(logger, cfg)) if err != nil { logger.Close() return nil, nil, fmt.Errorf("create multiplexing session: %w", err) diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go index 2ff52e9b8..c7c97a10f 100644 --- a/internal/gitaly/client/dial_test.go +++ b/internal/gitaly/client/dial_test.go @@ -57,7 +57,7 @@ func TestDial(t *testing.T) { }) t.Run("muxed conn", func(t *testing.T) { - handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }) + handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration()) nonMuxedConn, err := Dial(ctx, "tcp://"+ln.Addr().String(), nil, handshaker) require.NoError(t, err) defer func() { require.NoError(t, nonMuxedConn.Close()) }() diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index c94fa8281..72fcc9646 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -181,6 +181,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) return srv }, + backchannel.DefaultConfiguration(), ), ) @@ -835,6 +836,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) return srv }, + backchannel.DefaultConfiguration(), ), ) diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index 7dee225ed..b7eeae6cc 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -425,6 +425,7 @@ func TestUserCreateTag_transactional(t *testing.T) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) return srv }, + backchannel.DefaultConfiguration(), ), ) diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index d1c048021..a168f08b1 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -111,11 +111,15 @@ func TestApplyGitattributes_transactional(t *testing.T) { logger := testhelper.NewDiscardingLogEntry(t) client := newMuxedRepositoryClient(t, ctx, cfg, "unix://"+cfg.InternalSocketPath(), - backchannel.NewClientHandshaker(logger, func() backchannel.Server { - srv := grpc.NewServer() - gitalypb.RegisterRefTransactionServer(srv, transactionServer) - return srv - }), + backchannel.NewClientHandshaker( + logger, + func() backchannel.Server { + srv := grpc.NewServer() + gitalypb.RegisterRefTransactionServer(srv, transactionServer) + return srv + }, + backchannel.DefaultConfiguration(), + ), ) for _, tc := range []struct { diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 7d2bdf319..f0d3f5a33 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -237,6 +237,7 @@ func TestReplicateRepositoryTransactional(t *testing.T) { gitalypb.RegisterRefTransactionServer(srv, &txServer) return srv }, + backchannel.DefaultConfiguration(), )) // The first invocation creates the repository via a snapshot given that it doesn't yet diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index c841d6883..e5b709a6c 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -82,7 +82,7 @@ func newMuxedSmartHTTPClient(t *testing.T, ctx context.Context, serverSocketPath ctx, serverSocketPath, []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, - backchannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), serverFactory), + backchannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), serverFactory, backchannel.DefaultConfiguration()), ) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, conn.Close()) }) diff --git a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go deleted file mode 100644 index 59eb95b0e..000000000 --- a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go +++ /dev/null @@ -1,10 +0,0 @@ -package featureflag - -// PraefectUseYamuxConfigurationForGitaly switches the RPCs to be proxied over -// connections that are set up with custom Yamux configuration. -var PraefectUseYamuxConfigurationForGitaly = NewFeatureFlag( - "praefect_use_yamux_configuration_for_gitaly", - "v15.7.0", - "https://gitlab.com/gitlab-org/gitaly/-/issues/4644", - false, -) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index c42ba8550..7575e7054 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -82,6 +82,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { transaction.NewServer(txManager), sidechannelRegistry, ), + backchannel.DefaultConfiguration(), ), sidechannelRegistry, ) diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 0743bf520..21cdfc9fe 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -58,9 +58,6 @@ type HealthManager struct { // NewHealthManager returns a new health manager that monitors which nodes in the cluster // are healthy. -// -// If db is nil, the HealthManager checks the connection health normally but doesn't persist -// any information about the nodes in the database. func NewHealthManager( log logrus.FieldLogger, db glsql.Querier, @@ -141,13 +138,10 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages hm.locallyHealthy.Store(locallyHealthy) - if hm.db != nil { - // Database is nil only when an alternative set of connections is being tested behind a feature flag - // and we do not want to affect the consensus in the database, just the routing decisions. - ctx, cancel := hm.databaseTimeout(ctx) - defer cancel() + ctx, cancel := hm.databaseTimeout(ctx) + defer cancel() - if _, err := hm.db.ExecContext(ctx, ` + if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END FROM ( @@ -161,13 +155,12 @@ ON CONFLICT (praefect_name, shard_name, node_name) last_contact_attempt_at = NOW(), last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at) `, - hm.praefectName, - virtualStorages, - physicalStorages, - healthy, - ); err != nil { - return fmt.Errorf("update checks: %w", err) - } + hm.praefectName, + virtualStorages, + physicalStorages, + healthy, + ); err != nil { + return fmt.Errorf("update checks: %w", err) } if hm.firstUpdate { diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index 15b191cda..f53297854 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -49,31 +49,6 @@ func getHealthConsensus(t *testing.T, ctx context.Context, db glsql.Querier) map return consensus } -func TestHealthManagerWithoutDatabase(t *testing.T) { - t.Parallel() - - hm := NewHealthManager(testhelper.NewDiscardingLogger(t), nil, "ignored", HealthClients{ - "virtual-storage": { - "healthy-storage": mockHealthClient{ - CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { - return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil - }, - }, - "unhealthy-storage": mockHealthClient{ - CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { - return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil - }, - }, - }, - }) - hm.handleError = func(err error) error { return err } - - runCtx, cancelRun := context.WithCancel(testhelper.Context(t)) - require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun))) - require.Equal(t, map[string][]string{"virtual-storage": {"healthy-storage"}}, hm.HealthyNodes()) - <-hm.Updated() -} - func TestHealthManager(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index adc79cb75..25fde7a2e 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -478,7 +478,7 @@ func TestConnectionMultiplexing(t *testing.T) { promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, - backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }), + backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration()), nil, ) require.NoError(t, err) diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go index 6e8cc892c..5a70bb4b6 100644 --- a/internal/praefect/repocleaner/repository_test.go +++ b/internal/praefect/repocleaner/repository_test.go @@ -168,7 +168,7 @@ func TestRunner_Run(t *testing.T) { logger.SetLevel(logrus.DebugLevel) entry := logger.WithContext(ctx) - clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil)) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil), backchannel.DefaultConfiguration()) nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() @@ -277,7 +277,7 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) { logger := testhelper.NewDiscardingLogger(t) entry := logger.WithContext(ctx) - clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil)) + clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil), backchannel.DefaultConfiguration()) nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 025a76638..139bd62f9 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -5,7 +5,6 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "google.golang.org/grpc" ) @@ -145,51 +144,3 @@ type Router interface { // never be replicated. RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) } - -type featureFlaggedRouter struct { - flag featureflag.FeatureFlag - enabled Router - disabled Router -} - -// NewFeatureFlaggedRouter returns a Router that decides switches router implementation depending -// on a feature flag's status. -func NewFeatureFlaggedRouter(flag featureflag.FeatureFlag, enabledRouter, disabledRouter Router) Router { - return featureFlaggedRouter{ - flag: flag, - enabled: enabledRouter, - disabled: disabledRouter, - } -} - -func (r featureFlaggedRouter) router(ctx context.Context) Router { - if r.flag.IsEnabled(ctx) { - return r.enabled - } - - return r.disabled -} - -func (r featureFlaggedRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { - return r.router(ctx).RouteStorageAccessor(ctx, virtualStorage) -} - -func (r featureFlaggedRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { - return r.router(ctx).RouteStorageMutator(ctx, virtualStorage) -} - -func (r featureFlaggedRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { - return r.router(ctx).RouteRepositoryAccessor(ctx, virtualStorage, relativePath, forcePrimary) -} - -func (r featureFlaggedRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { - return r.router(ctx).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) -} - -func (r featureFlaggedRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { - return r.router(ctx).RouteRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath) -} - -func (r featureFlaggedRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) { - return r.router(ctx).RouteRepositoryMaintenance(ctx, virtualStorage, relativePath) -} diff --git a/internal/praefect/router_test.go b/internal/praefect/router_test.go deleted file mode 100644 index c3c507aa1..000000000 --- a/internal/praefect/router_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package praefect - -import ( - "context" - "errors" - "math/rand" - "net" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -func TestNewFeatureFlaggedRouter(t *testing.T) { - ctx := testhelper.Context(t) - flag := featureflag.FeatureFlag{Name: "test_flag"} - - enabledConn, disabledConn, enabledRouter, disabledRouter := setupFeatureFlaggedRouters(t, ctx) - router := NewFeatureFlaggedRouter(flag, enabledRouter, disabledRouter) - - for _, tc := range []struct { - desc string - flagEnabled bool - expectedConn *grpc.ClientConn - }{ - { - desc: "flag disabled", - flagEnabled: false, - expectedConn: disabledConn, - }, - { - desc: "flag enabled", - flagEnabled: true, - expectedConn: enabledConn, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx = featureflag.ContextWithFeatureFlag(ctx, flag, tc.flagEnabled) - - t.Run("RouteStorageAccessor", func(t *testing.T) { - route, err := router.RouteStorageAccessor(ctx, "virtual-storage") - require.NoError(t, err) - require.Equal(t, tc.expectedConn.Target(), route.Connection.Target()) - }) - - t.Run("RouteStorageMutator", func(t *testing.T) { - _, err := router.RouteStorageMutator(ctx, "virtual-storage") - require.Equal(t, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter"), err) - }) - - t.Run("RouteRepositoryAccessor", func(t *testing.T) { - route, err := router.RouteRepositoryAccessor(ctx, "virtual-storage", "relative-path", false) - require.NoError(t, err) - require.Equal(t, tc.expectedConn.Target(), route.Node.Connection.Target()) - }) - - t.Run("RouteRepositoryMutator", func(t *testing.T) { - route, err := router.RouteRepositoryMutator(ctx, "virtual-storage", "relative-path", "") - require.NoError(t, err) - require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target()) - }) - - t.Run("RouteRepositoryCreation", func(t *testing.T) { - route, err := router.RouteRepositoryCreation(ctx, "virtual-storage", "relative-path-new", "") - require.NoError(t, err) - require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target()) - }) - - t.Run("RouteRepositoryMaintenance", func(t *testing.T) { - route, err := router.RouteRepositoryMaintenance(ctx, "virtual-storage", "relative-path") - require.NoError(t, err) - require.Equal(t, tc.expectedConn.Target(), route.Nodes[0].Connection.Target()) - }) - }) - } -} - -func setupFeatureFlaggedRouters(t *testing.T, ctx context.Context) (*grpc.ClientConn, *grpc.ClientConn, *PerRepositoryRouter, *PerRepositoryRouter) { - db := testdb.New(t) - tx := db.Begin(t) - t.Cleanup(func() { tx.Rollback(t) }) - configuredStorages := map[string][]string{"virtual-storage": {"storage"}} - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredStorages}) - - repositoryStore := datastore.NewPostgresRepositoryStore(tx, configuredStorages) - require.NoError(t, repositoryStore.CreateRepository( - ctx, - 1, - "virtual-storage", - "relative-path", - "replica-path", - "storage", - nil, - nil, - true, - true, - )) - - // We don't need real connections, just the target in order to assert that we got the right connection back - // from the router. We mock out the actual connection with a net.Pipe. - enabledConn, err := grpc.Dial("enabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - _, pw := net.Pipe() - return pw, nil - }), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - // Close the connection immediately to clean up as we just care about the value of .Target() in the test. - testhelper.MustClose(t, enabledConn) - - disabledConn, err := grpc.Dial("disabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - _, pw := net.Pipe() - return pw, nil - }), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - testhelper.MustClose(t, disabledConn) - - elector := nodes.NewPerRepositoryElector(tx) - healthChecker := StaticHealthChecker{"virtual-storage": {"storage"}} - assignmentStore := datastore.NewAssignmentStore(tx, configuredStorages) - randomGenerator := rand.New(rand.NewSource(1)) - - enabledRouter := NewPerRepositoryRouter( - Connections{"virtual-storage": {"storage": enabledConn}}, - elector, - healthChecker, - randomGenerator, - repositoryStore, - assignmentStore, - repositoryStore, - nil, - ) - - disabledRouter := NewPerRepositoryRouter( - Connections{"virtual-storage": {"storage": disabledConn}}, - elector, - healthChecker, - randomGenerator, - repositoryStore, - assignmentStore, - repositoryStore, - nil, - ) - - return enabledConn, disabledConn, enabledRouter, disabledRouter -} diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index f0f15628f..d958c99dd 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -85,7 +85,7 @@ func TestServerFactory(t *testing.T) { rs := datastore.MockRepositoryStore{} txMgr := transactions.NewManager(conf) sidechannelRegistry := sidechannel.NewRegistry() - clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry)) + clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry), backchannel.DefaultConfiguration()) nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, sidechannelRegistry) require.NoError(t, err) nodeMgr.Start(0, time.Second) @@ -135,7 +135,7 @@ func TestServerFactory(t *testing.T) { return grpc.NewServer(grpc.Creds(lm)) } - clientHandshaker := backchannel.NewClientHandshaker(logger, factory) + clientHandshaker := backchannel.NewClientHandshaker(logger, factory, backchannel.DefaultConfiguration()) dialOpt := grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(creds)) cc, err := grpc.Dial(addr, dialOpt) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 2b2b3ee5e..98cf99e8d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -97,9 +97,16 @@ func TestNewBackchannelServerFactory(t *testing.T) { nodeSet, err := DialNodes(ctx, []*config.VirtualStorage{{ Name: "default", Nodes: []*config.Node{{Storage: "gitaly-1", Address: "tcp://" + ln.Addr().String()}}, - }}, nil, nil, backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory( - testhelper.NewDiscardingLogEntry(t), transaction.NewServer(mgr), nil, - )), nil) + }}, nil, nil, + backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + testhelper.NewDiscardingLogEntry(t), + transaction.NewServer(mgr), + nil, + ), + backchannel.DefaultConfiguration(), + ), nil) require.NoError(t, err) defer nodeSet.Close() @@ -530,6 +537,7 @@ func TestRemoveRepository(t *testing.T) { nil, backchannel.NewClientHandshaker( testhelper.NewDiscardingLogEntry(t), NewBackchannelServerFactory(testhelper.NewDiscardingLogEntry(t), transaction.NewServer(txMgr), nil), + backchannel.DefaultConfiguration(), ), nil, ) require.NoError(t, err) @@ -603,6 +611,7 @@ func TestRenameRepository(t *testing.T) { transaction.NewServer(txManager), nil, ), + backchannel.DefaultConfiguration(), ) ctx := testhelper.Context(t) diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index 95dfc69b2..986c2c440 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -517,6 +517,7 @@ func TestVerifier(t *testing.T) { transaction.NewServer(txManager), sidechannelRegistry, ), + backchannel.DefaultConfiguration(), ), sidechannelRegistry, ) diff --git a/internal/sidechannel/proxy_test.go b/internal/sidechannel/proxy_test.go index 398d9c780..3cffe2e78 100644 --- a/internal/sidechannel/proxy_test.go +++ b/internal/sidechannel/proxy_test.go @@ -121,7 +121,7 @@ func dialProxy(upstreamAddr string) (*grpc.ClientConn, error) { return grpc.NewServer(grpc.Creds(lm)) } - clientHandshaker := backchannel.NewClientHandshaker(newLogger(), factory) + clientHandshaker := backchannel.NewClientHandshaker(newLogger(), factory, backchannel.DefaultConfiguration()) dialOpts := []grpc.DialOption{ grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())), grpc.WithUnaryInterceptor(NewUnaryProxy(registry)), diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go index 2605fceb0..fcd8d5258 100644 --- a/internal/sidechannel/sidechannel.go +++ b/internal/sidechannel/sidechannel.go @@ -9,7 +9,6 @@ import ( "strconv" "time" - "github.com/hashicorp/yamux" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" @@ -134,29 +133,20 @@ func NewServerHandshaker(registry *Registry) *ServerHandshaker { // NewClientHandshaker is used to enable sidechannel support on outbound // gRPC connections. func NewClientHandshaker(logger *logrus.Entry, registry *Registry) client.Handshaker { - return backchannel.NewClientHandshakerWithYamuxConfig( + cfg := backchannel.DefaultConfiguration() + // If a client hangs up while the server is writing data to it then the + // server will block for 5 minutes by default before erroring out. This + // makes testing difficult and there is no reason to have such a long + // timeout in the case of sidechannels. A 1 second timeout is also OK. + cfg.StreamCloseTimeout = time.Second + + return backchannel.NewClientHandshaker( logger, func() backchannel.Server { lm := listenmux.New(insecure.NewCredentials()) lm.Register(NewServerHandshaker(registry)) return grpc.NewServer(grpc.Creds(lm)) }, - func(cfg *yamux.Config) { - // Backchannel sets a very large custom window size (16MB). This is not - // necessary for sidechannels because we use one stream per connection. - // Worse, this is wasteful, because a client that is serving many - // concurrent sidechannel calls may end up lazily creating a 16MB buffer - // for each ongoing call. See - // https://gitlab.com/gitlab-org/gitaly/-/issues/4132. To prevent this - // waste we change this value back to 256KB which is the default and - // minimum value. - cfg.MaxStreamWindowSize = 256 * 1024 - - // If a client hangs up while the server is writing data to it then the - // server will block for 5 minutes by default before erroring out. This - // makes testing difficult and there is no reason to have such a long - // timeout in the case of sidechannels. A 1 second timeout is also OK. - cfg.StreamCloseTimeout = time.Second - }, + cfg, ) } diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 5c7d414dd..839765759 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -197,9 +197,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // context. The values of these flags should be randomized to increase the test coverage. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConvertErrToStatus, true) - // PraefectUseYamuxConfigurationForGitaly gets tested in Praefect when routing RPCs and thus it affects many tests. - // Let's randomly select which connection we use so both sets of connections get tested somewhat. - ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectUseYamuxConfigurationForGitaly, rnd.Int()%2 == 0) // Randomly enable the use of the catfile cache in localrepo.ReadObject. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.LocalrepoReadObjectCached, rnd.Int()%2 == 0) |