Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2023-01-18 18:49:27 +0300
committerWill Chandler <wchandler@gitlab.com>2023-01-18 18:49:27 +0300
commit77470925231663b462bb498b5f59624dfcee29c0 (patch)
tree980ab2ac688cb874fab804e9925b513f6d45380c
parent39bb9d29c7fbc9248a351b011874b747ced03826 (diff)
parent931cebe924aa94a3bd68dff127b95cfb692e5574 (diff)
Merge branch 'smh-remove-conn-ff' into 'master'
Use Yamux configuration in Praefect by default See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5215 Merged-by: Will Chandler <wchandler@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--cmd/praefect/main.go97
-rw-r--r--internal/backchannel/backchannel.go25
-rw-r--r--internal/backchannel/backchannel_example_test.go2
-rw-r--r--internal/backchannel/backchannel_test.go6
-rw-r--r--internal/backchannel/client.go45
-rw-r--r--internal/backchannel/server.go9
-rw-r--r--internal/gitaly/client/dial_test.go2
-rw-r--r--internal/gitaly/service/operations/branches_test.go2
-rw-r--r--internal/gitaly/service/operations/tags_test.go1
-rw-r--r--internal/gitaly/service/repository/apply_gitattributes_test.go14
-rw-r--r--internal/gitaly/service/repository/replicate_test.go1
-rw-r--r--internal/gitaly/service/smarthttp/testhelper_test.go2
-rw-r--r--internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go10
-rw-r--r--internal/praefect/info_service_test.go1
-rw-r--r--internal/praefect/nodes/health_manager.go25
-rw-r--r--internal/praefect/nodes/health_manager_test.go25
-rw-r--r--internal/praefect/nodes/sql_elector_test.go2
-rw-r--r--internal/praefect/repocleaner/repository_test.go4
-rw-r--r--internal/praefect/router.go49
-rw-r--r--internal/praefect/router_test.go150
-rw-r--r--internal/praefect/server_factory_test.go4
-rw-r--r--internal/praefect/server_test.go15
-rw-r--r--internal/praefect/verifier_test.go1
-rw-r--r--internal/sidechannel/proxy_test.go2
-rw-r--r--internal/sidechannel/sidechannel.go28
-rw-r--r--internal/testhelper/testhelper.go3
26 files changed, 137 insertions, 388 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index e19e8ac45..540d7e982 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -69,7 +69,6 @@ import (
"strings"
"time"
- "github.com/hashicorp/yamux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -80,7 +79,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/log"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
@@ -312,15 +310,15 @@ func run(
transactionManager := transactions.NewManager(conf)
sidechannelRegistry := sidechannel.NewRegistry()
- newClientHandshaker := func(config func(*yamux.Config)) backchannel.ClientHandshaker {
- return backchannel.NewClientHandshakerWithYamuxConfig(
- logger,
- praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry),
- config,
- )
- }
+ backchannelCfg := backchannel.DefaultConfiguration()
+ backchannelCfg.AcceptBacklog = conf.Yamux.AcceptBacklog
+ backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes
+ clientHandshaker := backchannel.NewClientHandshaker(
+ logger,
+ praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry),
+ backchannelCfg,
+ )
- clientHandshaker := newClientHandshaker(nil)
assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
var (
nodeManager nodes.Manager
@@ -330,73 +328,46 @@ func run(
primaryGetter praefect.PrimaryGetter
)
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- dialNodes := func(clientHandshaker backchannel.ClientHandshaker) (praefect.NodeSet, error) {
- return praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
- }
-
- nodeSet, err = dialNodes(clientHandshaker)
+ nodeSet, err = praefect.DialNodes(
+ ctx,
+ conf.VirtualStorages,
+ protoregistry.GitalyProtoPreregistered,
+ errTracker,
+ clientHandshaker,
+ sidechannelRegistry,
+ )
if err != nil {
return fmt.Errorf("dial nodes: %w", err)
}
defer nodeSet.Close()
- // Dial a second set of connections to each storage with a different Yamux config
- // so we can use a feature flag to gradually test the new configuration.
- nodeSetWithCustomConfig, err := dialNodes(newClientHandshaker(func(cfg *yamux.Config) {
- cfg.AcceptBacklog = conf.Yamux.AcceptBacklog
- cfg.MaxStreamWindowSize = conf.Yamux.MaximumStreamWindowSizeBytes
- }))
- if err != nil {
- return fmt.Errorf("dial nodes reduced window: %w", err)
- }
- defer nodeSetWithCustomConfig.Close()
-
- runHealthManager := func(db glsql.Querier, nodeSet praefect.NodeSet) *nodes.HealthManager {
- hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
- go func() {
- if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
- logger.WithError(err).Error("health manager exited")
- }
- }()
-
- return hm
- }
-
- nodeSetHealthManager := runHealthManager(db, nodeSet)
- healthChecker = nodeSetHealthManager
+ healthManager := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
+ go func() {
+ if err := healthManager.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
+ logger.WithError(err).Error("health manager exited")
+ }
+ }()
- nodeSetWithCustomConfigHealthManager := runHealthManager(nil, nodeSetWithCustomConfig)
+ healthChecker = healthManager
// Wait for the first health check to complete so the Praefect doesn't start serving RPC
// before the router is ready with the health status of the nodes.
- <-nodeSetHealthManager.Updated()
- <-nodeSetWithCustomConfigHealthManager.Updated()
+ <-healthManager.Updated()
elector := nodes.NewPerRepositoryElector(db)
primaryGetter = elector
assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
- newPerRepositoryRouter := func(healthManager *nodes.HealthManager, nodeSet praefect.NodeSet) *praefect.PerRepositoryRouter {
- return praefect.NewPerRepositoryRouter(
- nodeSet.Connections(),
- elector,
- healthManager,
- praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
- csg,
- assignmentStore,
- rs,
- conf.DefaultReplicationFactors(),
- )
- }
-
- // Configure a router that uses a feature flag to switch between two different sets of connections.
- // This way we can toggle the feature flag to use a different set of connections for proxied
- // RPC calls.
- router = praefect.NewFeatureFlaggedRouter(
- featureflag.PraefectUseYamuxConfigurationForGitaly,
- newPerRepositoryRouter(nodeSetWithCustomConfigHealthManager, nodeSetWithCustomConfig),
- newPerRepositoryRouter(nodeSetHealthManager, nodeSet),
+ router = praefect.NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ elector,
+ healthManager,
+ praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
+ csg,
+ assignmentStore,
+ rs,
+ conf.DefaultReplicationFactors(),
)
if conf.BackgroundVerification.VerificationInterval > 0 {
@@ -405,7 +376,7 @@ func run(
logger,
db,
nodeSet.Connections(),
- nodeSetHealthManager,
+ healthManager,
conf.BackgroundVerification.VerificationInterval.Duration(),
conf.BackgroundVerification.DeleteInvalidRecords,
)
diff --git a/internal/backchannel/backchannel.go b/internal/backchannel/backchannel.go
index a7da22f15..990e3e5fb 100644
--- a/internal/backchannel/backchannel.go
+++ b/internal/backchannel/backchannel.go
@@ -40,26 +40,17 @@ import (
var magicBytes = []byte("backchannel")
// muxConfig returns a new config to use with the multiplexing session.
-func muxConfig(logger io.Writer, extra func(*yamux.Config)) *yamux.Config {
- cfg := yamux.DefaultConfig()
- cfg.LogOutput = logger
- // The server only accepts a single stream from the client, which is the client's gRPC stream.
- // The backchannel server should only receive a single stream from the server. As such, we can
- // limit maximum pending streams to 1 as there should never be more streams waiting.
- cfg.AcceptBacklog = 1
+func muxConfig(logger io.Writer, cfg Configuration) *yamux.Config {
+ yamuxCfg := yamux.DefaultConfig()
+ yamuxCfg.LogOutput = logger
// gRPC is already configured to send keep alives so we don't need yamux to do this for us.
// gRPC is a better choice as it sends the keep alives also to non-multiplexed connections.
- cfg.EnableKeepAlive = false
- // MaxStreamWindowSize configures the maximum receive buffer size for each stream. The sender
- // is allowed to send the configured amount of bytes without receiving an acknowledgement from the
- // receiver. This is can have a big impact on throughput as the latency increases, as the sender
- // can't proceed sending without receiving an acknowledgement back.
- cfg.MaxStreamWindowSize = 16 * 1024 * 1024
+ yamuxCfg.EnableKeepAlive = false
+ yamuxCfg.AcceptBacklog = cfg.AcceptBacklog
+ yamuxCfg.MaxStreamWindowSize = cfg.MaximumStreamWindowSizeBytes
+ yamuxCfg.StreamCloseTimeout = cfg.StreamCloseTimeout
- if extra != nil {
- extra(cfg)
- }
- return cfg
+ return yamuxCfg
}
// connCloser wraps a net.Conn and calls the provided close function instead when Close
diff --git a/internal/backchannel/backchannel_example_test.go b/internal/backchannel/backchannel_example_test.go
index 32fec8879..e4d6da59c 100644
--- a/internal/backchannel/backchannel_example_test.go
+++ b/internal/backchannel/backchannel_example_test.go
@@ -117,7 +117,7 @@ func invokeWithMuxedClient(logger *logrus.Entry, address string) error {
fmt.Println("Praefect responding via backchannel")
return stream.SendMsg(&gitalypb.VoteTransactionResponse{})
}))
- })
+ }, backchannel.DefaultConfiguration())
return invokeWithOpts(address, grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())))
}
diff --git a/internal/backchannel/backchannel_test.go b/internal/backchannel/backchannel_test.go
index 534b63962..ee89405a5 100644
--- a/internal/backchannel/backchannel_test.go
+++ b/internal/backchannel/backchannel_test.go
@@ -121,7 +121,7 @@ func TestBackchannel_concurrentRequestsFromMultipleClients(t *testing.T) {
})
return srv
- })
+ }, DefaultConfiguration())
<-start
client, err := grpc.Dial(ln.Addr().String(),
@@ -182,7 +182,7 @@ func TestHandshaker_idempotentClose(t *testing.T) {
stopCalled++
},
}
- })
+ }, DefaultConfiguration())
closeServer := make(chan struct{})
serverClosed := make(chan struct{})
@@ -276,7 +276,7 @@ func Benchmark(b *testing.B) {
opts := []grpc.DialOption{grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())}
if tc.multiplexed {
- clientHandshaker := NewClientHandshaker(newLogger(), func() Server { return grpc.NewServer() })
+ clientHandshaker := NewClientHandshaker(newLogger(), func() Server { return grpc.NewServer() }, DefaultConfiguration())
opts = []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())),
diff --git a/internal/backchannel/client.go b/internal/backchannel/client.go
index 031cca58b..c4526b7cc 100644
--- a/internal/backchannel/client.go
+++ b/internal/backchannel/client.go
@@ -24,40 +24,55 @@ type Server interface {
// a backchannel closes.
type ServerFactory func() Server
+// Configuration sets contains configuration for the backchannel's Yamux session.
+type Configuration struct {
+ // MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams.
+ // Higher value can increase throughput at the cost of more memory usage.
+ MaximumStreamWindowSizeBytes uint32
+ // AcceptBacklog sets the maximum number of stream openings in-flight before further openings
+ // block.
+ AcceptBacklog int
+ // StreamCloseTimeout is the maximum time that a stream will allowed to
+ // be in a half-closed state when `Close` is called before forcibly
+ // closing the connection.
+ StreamCloseTimeout time.Duration
+}
+
+// DefaultConfiguration returns the default configuration.
+func DefaultConfiguration() Configuration {
+ defaults := yamux.DefaultConfig()
+ return Configuration{
+ MaximumStreamWindowSizeBytes: defaults.MaxStreamWindowSize,
+ AcceptBacklog: defaults.AcceptBacklog,
+ StreamCloseTimeout: defaults.StreamCloseTimeout,
+ }
+}
+
// ClientHandshaker implements the client side handshake of the multiplexed connection.
type ClientHandshaker struct {
logger *logrus.Entry
serverFactory ServerFactory
- yamuxConfig func(*yamux.Config)
+ cfg Configuration
}
// NewClientHandshaker returns a new client side implementation of the backchannel. The provided
// logger is used to log multiplexing errors.
-func NewClientHandshaker(logger *logrus.Entry, serverFactory ServerFactory) ClientHandshaker {
- return NewClientHandshakerWithYamuxConfig(logger, serverFactory, nil)
-}
-
-// NewClientHandshakerWithYamuxConfig returns a new client side implementation of the
-// backchannel. The provided logger is used to log multiplexing errors.
-// For each connection that we accept, the yamuxConfig callback is
-// invoked to allow yamux configuration overrides. If yamuxConfig is nil
-// it is ignored.
-func NewClientHandshakerWithYamuxConfig(logger *logrus.Entry, serverFactory ServerFactory, yamuxConfig func(*yamux.Config)) ClientHandshaker {
- return ClientHandshaker{logger: logger, serverFactory: serverFactory, yamuxConfig: yamuxConfig}
+func NewClientHandshaker(logger *logrus.Entry, serverFactory ServerFactory, cfg Configuration) ClientHandshaker {
+ return ClientHandshaker{logger: logger, serverFactory: serverFactory, cfg: cfg}
}
// ClientHandshake returns TransportCredentials that perform the client side multiplexing handshake and
// start the backchannel Server on the established connections. The transport credentials are used to intiliaze the
// connection prior to the multiplexing.
func (ch ClientHandshaker) ClientHandshake(tc credentials.TransportCredentials) credentials.TransportCredentials {
- return clientHandshake{TransportCredentials: tc, serverFactory: ch.serverFactory, logger: ch.logger, yamuxConfig: ch.yamuxConfig}
+ return clientHandshake{TransportCredentials: tc, serverFactory: ch.serverFactory, logger: ch.logger, cfg: ch.cfg}
}
type clientHandshake struct {
credentials.TransportCredentials
serverFactory ServerFactory
logger *logrus.Entry
- yamuxConfig func(*yamux.Config)
+ cfg Configuration
}
func (ch clientHandshake) ClientHandshake(ctx context.Context, serverName string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
@@ -103,7 +118,7 @@ func (ch clientHandshake) serve(ctx context.Context, conn net.Conn) (net.Conn, e
logger := ch.logger.WriterLevel(logrus.ErrorLevel)
// Initiate the multiplexing session.
- muxSession, err := yamux.Client(conn, muxConfig(logger, ch.yamuxConfig))
+ muxSession, err := yamux.Client(conn, muxConfig(logger, ch.cfg))
if err != nil {
logger.Close()
return nil, fmt.Errorf("open multiplexing session: %w", err)
diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go
index 9b7e58bda..a6fe677ee 100644
--- a/internal/backchannel/server.go
+++ b/internal/backchannel/server.go
@@ -102,7 +102,14 @@ func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInf
logger := s.logger.WriterLevel(logrus.ErrorLevel)
// Open the server side of the multiplexing session.
- muxSession, err := yamux.Server(conn, muxConfig(logger, nil))
+ //
+ // Gitaly is using custom settings with a lower accept backlog and higher receive
+ // buffer size than Praefect and the clients. We should eventually strive to match
+ // the settings here to avoid Gitaly from buffering too much.
+ cfg := DefaultConfiguration()
+ cfg.AcceptBacklog = 1
+ cfg.MaximumStreamWindowSizeBytes = 16 * 1024 * 1024
+ muxSession, err := yamux.Server(conn, muxConfig(logger, cfg))
if err != nil {
logger.Close()
return nil, nil, fmt.Errorf("create multiplexing session: %w", err)
diff --git a/internal/gitaly/client/dial_test.go b/internal/gitaly/client/dial_test.go
index 2ff52e9b8..c7c97a10f 100644
--- a/internal/gitaly/client/dial_test.go
+++ b/internal/gitaly/client/dial_test.go
@@ -57,7 +57,7 @@ func TestDial(t *testing.T) {
})
t.Run("muxed conn", func(t *testing.T) {
- handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() })
+ handshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration())
nonMuxedConn, err := Dial(ctx, "tcp://"+ln.Addr().String(), nil, handshaker)
require.NoError(t, err)
defer func() { require.NoError(t, nonMuxedConn.Close()) }()
diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go
index c94fa8281..72fcc9646 100644
--- a/internal/gitaly/service/operations/branches_test.go
+++ b/internal/gitaly/service/operations/branches_test.go
@@ -181,6 +181,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) {
gitalypb.RegisterRefTransactionServer(srv, transactionServer)
return srv
},
+ backchannel.DefaultConfiguration(),
),
)
@@ -835,6 +836,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) {
gitalypb.RegisterRefTransactionServer(srv, transactionServer)
return srv
},
+ backchannel.DefaultConfiguration(),
),
)
diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go
index 7dee225ed..b7eeae6cc 100644
--- a/internal/gitaly/service/operations/tags_test.go
+++ b/internal/gitaly/service/operations/tags_test.go
@@ -425,6 +425,7 @@ func TestUserCreateTag_transactional(t *testing.T) {
gitalypb.RegisterRefTransactionServer(srv, transactionServer)
return srv
},
+ backchannel.DefaultConfiguration(),
),
)
diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go
index d1c048021..a168f08b1 100644
--- a/internal/gitaly/service/repository/apply_gitattributes_test.go
+++ b/internal/gitaly/service/repository/apply_gitattributes_test.go
@@ -111,11 +111,15 @@ func TestApplyGitattributes_transactional(t *testing.T) {
logger := testhelper.NewDiscardingLogEntry(t)
client := newMuxedRepositoryClient(t, ctx, cfg, "unix://"+cfg.InternalSocketPath(),
- backchannel.NewClientHandshaker(logger, func() backchannel.Server {
- srv := grpc.NewServer()
- gitalypb.RegisterRefTransactionServer(srv, transactionServer)
- return srv
- }),
+ backchannel.NewClientHandshaker(
+ logger,
+ func() backchannel.Server {
+ srv := grpc.NewServer()
+ gitalypb.RegisterRefTransactionServer(srv, transactionServer)
+ return srv
+ },
+ backchannel.DefaultConfiguration(),
+ ),
)
for _, tc := range []struct {
diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go
index 7d2bdf319..f0d3f5a33 100644
--- a/internal/gitaly/service/repository/replicate_test.go
+++ b/internal/gitaly/service/repository/replicate_test.go
@@ -237,6 +237,7 @@ func TestReplicateRepositoryTransactional(t *testing.T) {
gitalypb.RegisterRefTransactionServer(srv, &txServer)
return srv
},
+ backchannel.DefaultConfiguration(),
))
// The first invocation creates the repository via a snapshot given that it doesn't yet
diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go
index c841d6883..e5b709a6c 100644
--- a/internal/gitaly/service/smarthttp/testhelper_test.go
+++ b/internal/gitaly/service/smarthttp/testhelper_test.go
@@ -82,7 +82,7 @@ func newMuxedSmartHTTPClient(t *testing.T, ctx context.Context, serverSocketPath
ctx,
serverSocketPath,
[]grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))},
- backchannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), serverFactory),
+ backchannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), serverFactory, backchannel.DefaultConfiguration()),
)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, conn.Close()) })
diff --git a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go
deleted file mode 100644
index 59eb95b0e..000000000
--- a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package featureflag
-
-// PraefectUseYamuxConfigurationForGitaly switches the RPCs to be proxied over
-// connections that are set up with custom Yamux configuration.
-var PraefectUseYamuxConfigurationForGitaly = NewFeatureFlag(
- "praefect_use_yamux_configuration_for_gitaly",
- "v15.7.0",
- "https://gitlab.com/gitlab-org/gitaly/-/issues/4644",
- false,
-)
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index c42ba8550..7575e7054 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -82,6 +82,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
transaction.NewServer(txManager),
sidechannelRegistry,
),
+ backchannel.DefaultConfiguration(),
),
sidechannelRegistry,
)
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
index 0743bf520..21cdfc9fe 100644
--- a/internal/praefect/nodes/health_manager.go
+++ b/internal/praefect/nodes/health_manager.go
@@ -58,9 +58,6 @@ type HealthManager struct {
// NewHealthManager returns a new health manager that monitors which nodes in the cluster
// are healthy.
-//
-// If db is nil, the HealthManager checks the connection health normally but doesn't persist
-// any information about the nodes in the database.
func NewHealthManager(
log logrus.FieldLogger,
db glsql.Querier,
@@ -141,13 +138,10 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages
hm.locallyHealthy.Store(locallyHealthy)
- if hm.db != nil {
- // Database is nil only when an alternative set of connections is being tested behind a feature flag
- // and we do not want to affect the consensus in the database, just the routing decisions.
- ctx, cancel := hm.databaseTimeout(ctx)
- defer cancel()
+ ctx, cancel := hm.databaseTimeout(ctx)
+ defer cancel()
- if _, err := hm.db.ExecContext(ctx, `
+ if _, err := hm.db.ExecContext(ctx, `
INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END
FROM (
@@ -161,13 +155,12 @@ ON CONFLICT (praefect_name, shard_name, node_name)
last_contact_attempt_at = NOW(),
last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at)
`,
- hm.praefectName,
- virtualStorages,
- physicalStorages,
- healthy,
- ); err != nil {
- return fmt.Errorf("update checks: %w", err)
- }
+ hm.praefectName,
+ virtualStorages,
+ physicalStorages,
+ healthy,
+ ); err != nil {
+ return fmt.Errorf("update checks: %w", err)
}
if hm.firstUpdate {
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index 15b191cda..f53297854 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -49,31 +49,6 @@ func getHealthConsensus(t *testing.T, ctx context.Context, db glsql.Querier) map
return consensus
}
-func TestHealthManagerWithoutDatabase(t *testing.T) {
- t.Parallel()
-
- hm := NewHealthManager(testhelper.NewDiscardingLogger(t), nil, "ignored", HealthClients{
- "virtual-storage": {
- "healthy-storage": mockHealthClient{
- CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
- return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
- },
- },
- "unhealthy-storage": mockHealthClient{
- CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
- return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
- },
- },
- },
- })
- hm.handleError = func(err error) error { return err }
-
- runCtx, cancelRun := context.WithCancel(testhelper.Context(t))
- require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun)))
- require.Equal(t, map[string][]string{"virtual-storage": {"healthy-storage"}}, hm.HealthyNodes())
- <-hm.Updated()
-}
-
func TestHealthManager(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index adc79cb75..25fde7a2e 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -478,7 +478,7 @@ func TestConnectionMultiplexing(t *testing.T) {
promtest.NewMockHistogramVec(),
protoregistry.GitalyProtoPreregistered,
nil,
- backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }),
+ backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }, backchannel.DefaultConfiguration()),
nil,
)
require.NoError(t, err)
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
index 6e8cc892c..5a70bb4b6 100644
--- a/internal/praefect/repocleaner/repository_test.go
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -168,7 +168,7 @@ func TestRunner_Run(t *testing.T) {
logger.SetLevel(logrus.DebugLevel)
entry := logger.WithContext(ctx)
- clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil))
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil), backchannel.DefaultConfiguration())
nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -277,7 +277,7 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) {
logger := testhelper.NewDiscardingLogger(t)
entry := logger.WithContext(ctx)
- clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil))
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil), backchannel.DefaultConfiguration())
nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index 025a76638..139bd62f9 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -5,7 +5,6 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"google.golang.org/grpc"
)
@@ -145,51 +144,3 @@ type Router interface {
// never be replicated.
RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}
-
-type featureFlaggedRouter struct {
- flag featureflag.FeatureFlag
- enabled Router
- disabled Router
-}
-
-// NewFeatureFlaggedRouter returns a Router that decides switches router implementation depending
-// on a feature flag's status.
-func NewFeatureFlaggedRouter(flag featureflag.FeatureFlag, enabledRouter, disabledRouter Router) Router {
- return featureFlaggedRouter{
- flag: flag,
- enabled: enabledRouter,
- disabled: disabledRouter,
- }
-}
-
-func (r featureFlaggedRouter) router(ctx context.Context) Router {
- if r.flag.IsEnabled(ctx) {
- return r.enabled
- }
-
- return r.disabled
-}
-
-func (r featureFlaggedRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) {
- return r.router(ctx).RouteStorageAccessor(ctx, virtualStorage)
-}
-
-func (r featureFlaggedRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) {
- return r.router(ctx).RouteStorageMutator(ctx, virtualStorage)
-}
-
-func (r featureFlaggedRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
- return r.router(ctx).RouteRepositoryAccessor(ctx, virtualStorage, relativePath, forcePrimary)
-}
-
-func (r featureFlaggedRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
- return r.router(ctx).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
-}
-
-func (r featureFlaggedRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
- return r.router(ctx).RouteRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
-}
-
-func (r featureFlaggedRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) {
- return r.router(ctx).RouteRepositoryMaintenance(ctx, virtualStorage, relativePath)
-}
diff --git a/internal/praefect/router_test.go b/internal/praefect/router_test.go
deleted file mode 100644
index c3c507aa1..000000000
--- a/internal/praefect/router_test.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package praefect
-
-import (
- "context"
- "errors"
- "math/rand"
- "net"
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-func TestNewFeatureFlaggedRouter(t *testing.T) {
- ctx := testhelper.Context(t)
- flag := featureflag.FeatureFlag{Name: "test_flag"}
-
- enabledConn, disabledConn, enabledRouter, disabledRouter := setupFeatureFlaggedRouters(t, ctx)
- router := NewFeatureFlaggedRouter(flag, enabledRouter, disabledRouter)
-
- for _, tc := range []struct {
- desc string
- flagEnabled bool
- expectedConn *grpc.ClientConn
- }{
- {
- desc: "flag disabled",
- flagEnabled: false,
- expectedConn: disabledConn,
- },
- {
- desc: "flag enabled",
- flagEnabled: true,
- expectedConn: enabledConn,
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx = featureflag.ContextWithFeatureFlag(ctx, flag, tc.flagEnabled)
-
- t.Run("RouteStorageAccessor", func(t *testing.T) {
- route, err := router.RouteStorageAccessor(ctx, "virtual-storage")
- require.NoError(t, err)
- require.Equal(t, tc.expectedConn.Target(), route.Connection.Target())
- })
-
- t.Run("RouteStorageMutator", func(t *testing.T) {
- _, err := router.RouteStorageMutator(ctx, "virtual-storage")
- require.Equal(t, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter"), err)
- })
-
- t.Run("RouteRepositoryAccessor", func(t *testing.T) {
- route, err := router.RouteRepositoryAccessor(ctx, "virtual-storage", "relative-path", false)
- require.NoError(t, err)
- require.Equal(t, tc.expectedConn.Target(), route.Node.Connection.Target())
- })
-
- t.Run("RouteRepositoryMutator", func(t *testing.T) {
- route, err := router.RouteRepositoryMutator(ctx, "virtual-storage", "relative-path", "")
- require.NoError(t, err)
- require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target())
- })
-
- t.Run("RouteRepositoryCreation", func(t *testing.T) {
- route, err := router.RouteRepositoryCreation(ctx, "virtual-storage", "relative-path-new", "")
- require.NoError(t, err)
- require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target())
- })
-
- t.Run("RouteRepositoryMaintenance", func(t *testing.T) {
- route, err := router.RouteRepositoryMaintenance(ctx, "virtual-storage", "relative-path")
- require.NoError(t, err)
- require.Equal(t, tc.expectedConn.Target(), route.Nodes[0].Connection.Target())
- })
- })
- }
-}
-
-func setupFeatureFlaggedRouters(t *testing.T, ctx context.Context) (*grpc.ClientConn, *grpc.ClientConn, *PerRepositoryRouter, *PerRepositoryRouter) {
- db := testdb.New(t)
- tx := db.Begin(t)
- t.Cleanup(func() { tx.Rollback(t) })
- configuredStorages := map[string][]string{"virtual-storage": {"storage"}}
- testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredStorages})
-
- repositoryStore := datastore.NewPostgresRepositoryStore(tx, configuredStorages)
- require.NoError(t, repositoryStore.CreateRepository(
- ctx,
- 1,
- "virtual-storage",
- "relative-path",
- "replica-path",
- "storage",
- nil,
- nil,
- true,
- true,
- ))
-
- // We don't need real connections, just the target in order to assert that we got the right connection back
- // from the router. We mock out the actual connection with a net.Pipe.
- enabledConn, err := grpc.Dial("enabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
- _, pw := net.Pipe()
- return pw, nil
- }), grpc.WithTransportCredentials(insecure.NewCredentials()))
- require.NoError(t, err)
- // Close the connection immediately to clean up as we just care about the value of .Target() in the test.
- testhelper.MustClose(t, enabledConn)
-
- disabledConn, err := grpc.Dial("disabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
- _, pw := net.Pipe()
- return pw, nil
- }), grpc.WithTransportCredentials(insecure.NewCredentials()))
- require.NoError(t, err)
- testhelper.MustClose(t, disabledConn)
-
- elector := nodes.NewPerRepositoryElector(tx)
- healthChecker := StaticHealthChecker{"virtual-storage": {"storage"}}
- assignmentStore := datastore.NewAssignmentStore(tx, configuredStorages)
- randomGenerator := rand.New(rand.NewSource(1))
-
- enabledRouter := NewPerRepositoryRouter(
- Connections{"virtual-storage": {"storage": enabledConn}},
- elector,
- healthChecker,
- randomGenerator,
- repositoryStore,
- assignmentStore,
- repositoryStore,
- nil,
- )
-
- disabledRouter := NewPerRepositoryRouter(
- Connections{"virtual-storage": {"storage": disabledConn}},
- elector,
- healthChecker,
- randomGenerator,
- repositoryStore,
- assignmentStore,
- repositoryStore,
- nil,
- )
-
- return enabledConn, disabledConn, enabledRouter, disabledRouter
-}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index f0f15628f..d958c99dd 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -85,7 +85,7 @@ func TestServerFactory(t *testing.T) {
rs := datastore.MockRepositoryStore{}
txMgr := transactions.NewManager(conf)
sidechannelRegistry := sidechannel.NewRegistry()
- clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry))
+ clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry), backchannel.DefaultConfiguration())
nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, sidechannelRegistry)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
@@ -135,7 +135,7 @@ func TestServerFactory(t *testing.T) {
return grpc.NewServer(grpc.Creds(lm))
}
- clientHandshaker := backchannel.NewClientHandshaker(logger, factory)
+ clientHandshaker := backchannel.NewClientHandshaker(logger, factory, backchannel.DefaultConfiguration())
dialOpt := grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(creds))
cc, err := grpc.Dial(addr, dialOpt)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 2b2b3ee5e..98cf99e8d 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -97,9 +97,16 @@ func TestNewBackchannelServerFactory(t *testing.T) {
nodeSet, err := DialNodes(ctx, []*config.VirtualStorage{{
Name: "default",
Nodes: []*config.Node{{Storage: "gitaly-1", Address: "tcp://" + ln.Addr().String()}},
- }}, nil, nil, backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(
- testhelper.NewDiscardingLogEntry(t), transaction.NewServer(mgr), nil,
- )), nil)
+ }}, nil, nil,
+ backchannel.NewClientHandshaker(
+ logger,
+ NewBackchannelServerFactory(
+ testhelper.NewDiscardingLogEntry(t),
+ transaction.NewServer(mgr),
+ nil,
+ ),
+ backchannel.DefaultConfiguration(),
+ ), nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -530,6 +537,7 @@ func TestRemoveRepository(t *testing.T) {
nil, backchannel.NewClientHandshaker(
testhelper.NewDiscardingLogEntry(t),
NewBackchannelServerFactory(testhelper.NewDiscardingLogEntry(t), transaction.NewServer(txMgr), nil),
+ backchannel.DefaultConfiguration(),
), nil,
)
require.NoError(t, err)
@@ -603,6 +611,7 @@ func TestRenameRepository(t *testing.T) {
transaction.NewServer(txManager),
nil,
),
+ backchannel.DefaultConfiguration(),
)
ctx := testhelper.Context(t)
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index 95dfc69b2..986c2c440 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -517,6 +517,7 @@ func TestVerifier(t *testing.T) {
transaction.NewServer(txManager),
sidechannelRegistry,
),
+ backchannel.DefaultConfiguration(),
),
sidechannelRegistry,
)
diff --git a/internal/sidechannel/proxy_test.go b/internal/sidechannel/proxy_test.go
index 398d9c780..3cffe2e78 100644
--- a/internal/sidechannel/proxy_test.go
+++ b/internal/sidechannel/proxy_test.go
@@ -121,7 +121,7 @@ func dialProxy(upstreamAddr string) (*grpc.ClientConn, error) {
return grpc.NewServer(grpc.Creds(lm))
}
- clientHandshaker := backchannel.NewClientHandshaker(newLogger(), factory)
+ clientHandshaker := backchannel.NewClientHandshaker(newLogger(), factory, backchannel.DefaultConfiguration())
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())),
grpc.WithUnaryInterceptor(NewUnaryProxy(registry)),
diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go
index 2605fceb0..fcd8d5258 100644
--- a/internal/sidechannel/sidechannel.go
+++ b/internal/sidechannel/sidechannel.go
@@ -9,7 +9,6 @@ import (
"strconv"
"time"
- "github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
@@ -134,29 +133,20 @@ func NewServerHandshaker(registry *Registry) *ServerHandshaker {
// NewClientHandshaker is used to enable sidechannel support on outbound
// gRPC connections.
func NewClientHandshaker(logger *logrus.Entry, registry *Registry) client.Handshaker {
- return backchannel.NewClientHandshakerWithYamuxConfig(
+ cfg := backchannel.DefaultConfiguration()
+ // If a client hangs up while the server is writing data to it then the
+ // server will block for 5 minutes by default before erroring out. This
+ // makes testing difficult and there is no reason to have such a long
+ // timeout in the case of sidechannels. A 1 second timeout is also OK.
+ cfg.StreamCloseTimeout = time.Second
+
+ return backchannel.NewClientHandshaker(
logger,
func() backchannel.Server {
lm := listenmux.New(insecure.NewCredentials())
lm.Register(NewServerHandshaker(registry))
return grpc.NewServer(grpc.Creds(lm))
},
- func(cfg *yamux.Config) {
- // Backchannel sets a very large custom window size (16MB). This is not
- // necessary for sidechannels because we use one stream per connection.
- // Worse, this is wasteful, because a client that is serving many
- // concurrent sidechannel calls may end up lazily creating a 16MB buffer
- // for each ongoing call. See
- // https://gitlab.com/gitlab-org/gitaly/-/issues/4132. To prevent this
- // waste we change this value back to 256KB which is the default and
- // minimum value.
- cfg.MaxStreamWindowSize = 256 * 1024
-
- // If a client hangs up while the server is writing data to it then the
- // server will block for 5 minutes by default before erroring out. This
- // makes testing difficult and there is no reason to have such a long
- // timeout in the case of sidechannels. A 1 second timeout is also OK.
- cfg.StreamCloseTimeout = time.Second
- },
+ cfg,
)
}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 5c7d414dd..839765759 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -197,9 +197,6 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// context. The values of these flags should be randomized to increase the test coverage.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.RunCommandsInCGroup, true)
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.ConvertErrToStatus, true)
- // PraefectUseYamuxConfigurationForGitaly gets tested in Praefect when routing RPCs and thus it affects many tests.
- // Let's randomly select which connection we use so both sets of connections get tested somewhat.
- ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectUseYamuxConfigurationForGitaly, rnd.Int()%2 == 0)
// Randomly enable the use of the catfile cache in localrepo.ReadObject.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.LocalrepoReadObjectCached, rnd.Int()%2 == 0)