diff options
author | Will Chandler <wchandler@gitlab.com> | 2022-12-01 21:09:09 +0300 |
---|---|---|
committer | Will Chandler <wchandler@gitlab.com> | 2022-12-01 21:09:09 +0300 |
commit | cd7d897a9e96740665a2ded5efb6e71c00deffb0 (patch) | |
tree | 01fae004d8e6fe352b392ee671c98012c18641db | |
parent | beed7f98615584976d09368afc1443ea91349ac6 (diff) | |
parent | 492a5b04807cfa179441564875a789f9dd06460d (diff) |
Merge branch 'smh-fix-buffer-size' into 'master'
Add feature flag for testing connections with smaller Yamux window size
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5086
Merged-by: Will Chandler <wchandler@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | cmd/praefect/main.go | 86 | ||||
-rw-r--r-- | config.praefect.toml.example | 8 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go | 10 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 40 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 21 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 4 | ||||
-rw-r--r-- | internal/praefect/nodes/health_manager.go | 25 | ||||
-rw-r--r-- | internal/praefect/nodes/health_manager_test.go | 25 | ||||
-rw-r--r-- | internal/praefect/router.go | 49 | ||||
-rw-r--r-- | internal/praefect/router_test.go | 150 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 4 |
11 files changed, 392 insertions, 30 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index f0fe2b3e6..d8443cf67 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -68,6 +68,7 @@ import ( "strings" "time" + "github.com/hashicorp/yamux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -78,6 +79,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/log" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" @@ -308,7 +310,16 @@ func run( transactionManager := transactions.NewManager(conf) sidechannelRegistry := sidechannel.NewRegistry() - clientHandshaker := backchannel.NewClientHandshaker(logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry)) + + newClientHandshaker := func(config func(*yamux.Config)) backchannel.ClientHandshaker { + return backchannel.NewClientHandshakerWithYamuxConfig( + logger, + praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry), + config, + ) + } + + clientHandshaker := newClientHandshaker(nil) assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames()) var ( nodeManager nodes.Manager @@ -318,38 +329,73 @@ func run( primaryGetter praefect.PrimaryGetter ) if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { - nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) + dialNodes := func(clientHandshaker backchannel.ClientHandshaker) (praefect.NodeSet, error) { + return praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry) + } + + nodeSet, err = dialNodes(clientHandshaker) if err != nil { return fmt.Errorf("dial nodes: %w", err) } defer nodeSet.Close() - hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) - go func() { - if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { - logger.WithError(err).Error("health manager exited") - } - }() - healthChecker = hm + // Dial a second set of connections to each storage with a different Yamux config + // so we can use a feature flag to gradually test the new configuration. + nodeSetWithCustomConfig, err := dialNodes(newClientHandshaker(func(cfg *yamux.Config) { + cfg.AcceptBacklog = conf.Yamux.AcceptBacklog + cfg.MaxStreamWindowSize = conf.Yamux.MaximumStreamWindowSizeBytes + })) + if err != nil { + return fmt.Errorf("dial nodes reduced window: %w", err) + } + defer nodeSetWithCustomConfig.Close() + + runHealthManager := func(db glsql.Querier, nodeSet praefect.NodeSet) *nodes.HealthManager { + hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients()) + go func() { + if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil { + logger.WithError(err).Error("health manager exited") + } + }() + + return hm + } + + nodeSetHealthManager := runHealthManager(db, nodeSet) + healthChecker = nodeSetHealthManager + + nodeSetWithCustomConfigHealthManager := runHealthManager(nil, nodeSetWithCustomConfig) // Wait for the first health check to complete so the Praefect doesn't start serving RPC // before the router is ready with the health status of the nodes. - <-hm.Updated() + <-nodeSetHealthManager.Updated() + <-nodeSetWithCustomConfigHealthManager.Updated() elector := nodes.NewPerRepositoryElector(db) primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) - router = praefect.NewPerRepositoryRouter( - nodeSet.Connections(), - elector, - hm, - praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), - csg, - assignmentStore, - rs, - conf.DefaultReplicationFactors(), + newPerRepositoryRouter := func(healthManager *nodes.HealthManager, nodeSet praefect.NodeSet) *praefect.PerRepositoryRouter { + return praefect.NewPerRepositoryRouter( + nodeSet.Connections(), + elector, + healthManager, + praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), + csg, + assignmentStore, + rs, + conf.DefaultReplicationFactors(), + ) + } + + // Configure a router that uses a feature flag to switch between two different sets of connections. + // This way we can toggle the feature flag to use a different set of connections for proxied + // RPC calls. + router = praefect.NewFeatureFlaggedRouter( + featureflag.PraefectUseYamuxConfigurationForGitaly, + newPerRepositoryRouter(nodeSetWithCustomConfigHealthManager, nodeSetWithCustomConfig), + newPerRepositoryRouter(nodeSetHealthManager, nodeSet), ) if conf.BackgroundVerification.VerificationInterval > 0 { @@ -358,7 +404,7 @@ func run( logger, db, nodeSet.Connections(), - hm, + nodeSetHealthManager, conf.BackgroundVerification.VerificationInterval.Duration(), conf.BackgroundVerification.DeleteInvalidRecords, ) diff --git a/config.praefect.toml.example b/config.praefect.toml.example index afe0f9153..f7daaf128 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -66,3 +66,11 @@ name = 'praefect' storage = "praefect-git-2" address = "tcp://praefect-git-2.internal" token = 'token3' + +[yamux] +# MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams. +# Higher value can increase throughput at the cost of more memory usage. +maximum_stream_window_size_bytes = 262144 +# AcceptBacklog sets the maximum number of stream openings in-flight +# before further openings block. +accept_backlog = 256 diff --git a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go new file mode 100644 index 000000000..59eb95b0e --- /dev/null +++ b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go @@ -0,0 +1,10 @@ +package featureflag + +// PraefectUseYamuxConfigurationForGitaly switches the RPCs to be proxied over +// connections that are set up with custom Yamux configuration. +var PraefectUseYamuxConfigurationForGitaly = NewFeatureFlag( + "praefect_use_yamux_configuration_for_gitaly", + "v15.7.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/4644", + false, +) diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 9092b2c6f..dd270edb5 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -6,6 +6,7 @@ import ( "os" "time" + "github.com/hashicorp/yamux" "github.com/pelletier/go-toml/v2" promclient "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" @@ -155,6 +156,40 @@ type Config struct { MemoryQueueEnabled bool `toml:"memory_queue_enabled,omitempty"` GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"` RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"` + Yamux Yamux `toml:"yamux,omitempty"` +} + +// Yamux contains Yamux related configuration values. +type Yamux struct { + // MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams. + // Higher value can increase throughput at the cost of more memory usage. + MaximumStreamWindowSizeBytes uint32 `toml:"maximum_stream_window_size_bytes,omitempty"` + // AcceptBacklog sets the maximum number of stream openings in-flight before further openings + // block. + AcceptBacklog int `toml:"accept_backlog,omitempty"` +} + +func (cfg Yamux) validate() error { + if cfg.MaximumStreamWindowSizeBytes < 262144 { + // Yamux requires the stream window size to be at minimum 256KiB. + return fmt.Errorf("yamux.maximum_stream_window_size_bytes must be at least 262144 but it was %d", cfg.MaximumStreamWindowSizeBytes) + } + + if cfg.AcceptBacklog < 1 { + // Yamux requires accept backlog to be at least 1 + return fmt.Errorf("yamux.accept_backlog must be at least 1 but it was %d", cfg.AcceptBacklog) + } + + return nil +} + +// DefaultYamuxConfig returns the default Yamux configuration values. +func DefaultYamuxConfig() Yamux { + defaultCfg := yamux.DefaultConfig() + return Yamux{ + MaximumStreamWindowSizeBytes: defaultCfg.MaxStreamWindowSize, + AcceptBacklog: defaultCfg.AcceptBacklog, + } } // VirtualStorage represents a set of nodes for a storage @@ -185,6 +220,7 @@ func FromFile(filePath string) (Config, error) { // Sets the default Failover, to be overwritten when deserializing the TOML Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository}, RepositoriesCleanup: DefaultRepositoriesCleanup(), + Yamux: DefaultYamuxConfig(), } if err := toml.Unmarshal(b, conf); err != nil { return Config{}, err @@ -279,6 +315,10 @@ func (c *Config) Validate() error { } } + if err := c.Yamux.validate(); err != nil { + return err + } + return nil } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index cc89c919b..68e24b6a8 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -207,6 +207,20 @@ func TestConfigValidation(t *testing.T) { }, errMsg: `repositories_cleanup.run_interval is less then 1m0s, which could lead to a database performance problem`, }, + { + desc: "yamux.maximum_stream_window_size_bytes is too low", + changeConfig: func(cfg *Config) { + cfg.Yamux.MaximumStreamWindowSizeBytes = 16 + }, + errMsg: `yamux.maximum_stream_window_size_bytes must be at least 262144 but it was 16`, + }, + { + desc: "yamux.maximum_stream_window_size_bytes is too low", + changeConfig: func(cfg *Config) { + cfg.Yamux.AcceptBacklog = 0 + }, + errMsg: `yamux.accept_backlog must be at least 1 but it was 0`, + }, } for _, tc := range testCases { @@ -220,6 +234,7 @@ func TestConfigValidation(t *testing.T) { }, Failover: Failover{ElectionStrategy: ElectionStrategySQL}, RepositoriesCleanup: DefaultRepositoriesCleanup(), + Yamux: DefaultYamuxConfig(), } tc.changeConfig(&config) @@ -332,6 +347,10 @@ func TestConfigParsing(t *testing.T) { VerificationInterval: duration.Duration(24 * time.Hour), DeleteInvalidRecords: true, }, + Yamux: Yamux{ + MaximumStreamWindowSizeBytes: 1000, + AcceptBacklog: 2000, + }, }, }, { @@ -358,6 +377,7 @@ func TestConfigParsing(t *testing.T) { RepositoriesInBatch: 11, }, BackgroundVerification: DefaultBackgroundVerificationConfig(), + Yamux: DefaultYamuxConfig(), }, }, { @@ -381,6 +401,7 @@ func TestConfigParsing(t *testing.T) { RepositoriesInBatch: 16, }, BackgroundVerification: DefaultBackgroundVerificationConfig(), + Yamux: DefaultYamuxConfig(), }, }, { diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index f7d591eee..a3112a9f2 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -81,3 +81,7 @@ read_error_threshold_count = 100 check_interval = "1s" run_interval = "3s" repositories_in_batch = 10 + +[yamux] +maximum_stream_window_size_bytes = 1000 +accept_backlog = 2000
\ No newline at end of file diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go index 21cdfc9fe..0743bf520 100644 --- a/internal/praefect/nodes/health_manager.go +++ b/internal/praefect/nodes/health_manager.go @@ -58,6 +58,9 @@ type HealthManager struct { // NewHealthManager returns a new health manager that monitors which nodes in the cluster // are healthy. +// +// If db is nil, the HealthManager checks the connection health normally but doesn't persist +// any information about the nodes in the database. func NewHealthManager( log logrus.FieldLogger, db glsql.Querier, @@ -138,10 +141,13 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages hm.locallyHealthy.Store(locallyHealthy) - ctx, cancel := hm.databaseTimeout(ctx) - defer cancel() + if hm.db != nil { + // Database is nil only when an alternative set of connections is being tested behind a feature flag + // and we do not want to affect the consensus in the database, just the routing decisions. + ctx, cancel := hm.databaseTimeout(ctx) + defer cancel() - if _, err := hm.db.ExecContext(ctx, ` + if _, err := hm.db.ExecContext(ctx, ` INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at) SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END FROM ( @@ -155,12 +161,13 @@ ON CONFLICT (praefect_name, shard_name, node_name) last_contact_attempt_at = NOW(), last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at) `, - hm.praefectName, - virtualStorages, - physicalStorages, - healthy, - ); err != nil { - return fmt.Errorf("update checks: %w", err) + hm.praefectName, + virtualStorages, + physicalStorages, + healthy, + ); err != nil { + return fmt.Errorf("update checks: %w", err) + } } if hm.firstUpdate { diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go index f53297854..15b191cda 100644 --- a/internal/praefect/nodes/health_manager_test.go +++ b/internal/praefect/nodes/health_manager_test.go @@ -49,6 +49,31 @@ func getHealthConsensus(t *testing.T, ctx context.Context, db glsql.Querier) map return consensus } +func TestHealthManagerWithoutDatabase(t *testing.T) { + t.Parallel() + + hm := NewHealthManager(testhelper.NewDiscardingLogger(t), nil, "ignored", HealthClients{ + "virtual-storage": { + "healthy-storage": mockHealthClient{ + CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil + }, + }, + "unhealthy-storage": mockHealthClient{ + CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil + }, + }, + }, + }) + hm.handleError = func(err error) error { return err } + + runCtx, cancelRun := context.WithCancel(testhelper.Context(t)) + require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun))) + require.Equal(t, map[string][]string{"virtual-storage": {"healthy-storage"}}, hm.HealthyNodes()) + <-hm.Updated() +} + func TestHealthManager(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 139bd62f9..025a76638 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -5,6 +5,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" "google.golang.org/grpc" ) @@ -144,3 +145,51 @@ type Router interface { // never be replicated. RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) } + +type featureFlaggedRouter struct { + flag featureflag.FeatureFlag + enabled Router + disabled Router +} + +// NewFeatureFlaggedRouter returns a Router that decides switches router implementation depending +// on a feature flag's status. +func NewFeatureFlaggedRouter(flag featureflag.FeatureFlag, enabledRouter, disabledRouter Router) Router { + return featureFlaggedRouter{ + flag: flag, + enabled: enabledRouter, + disabled: disabledRouter, + } +} + +func (r featureFlaggedRouter) router(ctx context.Context) Router { + if r.flag.IsEnabled(ctx) { + return r.enabled + } + + return r.disabled +} + +func (r featureFlaggedRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { + return r.router(ctx).RouteStorageAccessor(ctx, virtualStorage) +} + +func (r featureFlaggedRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { + return r.router(ctx).RouteStorageMutator(ctx, virtualStorage) +} + +func (r featureFlaggedRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) { + return r.router(ctx).RouteRepositoryAccessor(ctx, virtualStorage, relativePath, forcePrimary) +} + +func (r featureFlaggedRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return r.router(ctx).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + +func (r featureFlaggedRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) { + return r.router(ctx).RouteRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath) +} + +func (r featureFlaggedRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) { + return r.router(ctx).RouteRepositoryMaintenance(ctx, virtualStorage, relativePath) +} diff --git a/internal/praefect/router_test.go b/internal/praefect/router_test.go new file mode 100644 index 000000000..c3c507aa1 --- /dev/null +++ b/internal/praefect/router_test.go @@ -0,0 +1,150 @@ +package praefect + +import ( + "context" + "errors" + "math/rand" + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestNewFeatureFlaggedRouter(t *testing.T) { + ctx := testhelper.Context(t) + flag := featureflag.FeatureFlag{Name: "test_flag"} + + enabledConn, disabledConn, enabledRouter, disabledRouter := setupFeatureFlaggedRouters(t, ctx) + router := NewFeatureFlaggedRouter(flag, enabledRouter, disabledRouter) + + for _, tc := range []struct { + desc string + flagEnabled bool + expectedConn *grpc.ClientConn + }{ + { + desc: "flag disabled", + flagEnabled: false, + expectedConn: disabledConn, + }, + { + desc: "flag enabled", + flagEnabled: true, + expectedConn: enabledConn, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx = featureflag.ContextWithFeatureFlag(ctx, flag, tc.flagEnabled) + + t.Run("RouteStorageAccessor", func(t *testing.T) { + route, err := router.RouteStorageAccessor(ctx, "virtual-storage") + require.NoError(t, err) + require.Equal(t, tc.expectedConn.Target(), route.Connection.Target()) + }) + + t.Run("RouteStorageMutator", func(t *testing.T) { + _, err := router.RouteStorageMutator(ctx, "virtual-storage") + require.Equal(t, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter"), err) + }) + + t.Run("RouteRepositoryAccessor", func(t *testing.T) { + route, err := router.RouteRepositoryAccessor(ctx, "virtual-storage", "relative-path", false) + require.NoError(t, err) + require.Equal(t, tc.expectedConn.Target(), route.Node.Connection.Target()) + }) + + t.Run("RouteRepositoryMutator", func(t *testing.T) { + route, err := router.RouteRepositoryMutator(ctx, "virtual-storage", "relative-path", "") + require.NoError(t, err) + require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target()) + }) + + t.Run("RouteRepositoryCreation", func(t *testing.T) { + route, err := router.RouteRepositoryCreation(ctx, "virtual-storage", "relative-path-new", "") + require.NoError(t, err) + require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target()) + }) + + t.Run("RouteRepositoryMaintenance", func(t *testing.T) { + route, err := router.RouteRepositoryMaintenance(ctx, "virtual-storage", "relative-path") + require.NoError(t, err) + require.Equal(t, tc.expectedConn.Target(), route.Nodes[0].Connection.Target()) + }) + }) + } +} + +func setupFeatureFlaggedRouters(t *testing.T, ctx context.Context) (*grpc.ClientConn, *grpc.ClientConn, *PerRepositoryRouter, *PerRepositoryRouter) { + db := testdb.New(t) + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + configuredStorages := map[string][]string{"virtual-storage": {"storage"}} + testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredStorages}) + + repositoryStore := datastore.NewPostgresRepositoryStore(tx, configuredStorages) + require.NoError(t, repositoryStore.CreateRepository( + ctx, + 1, + "virtual-storage", + "relative-path", + "replica-path", + "storage", + nil, + nil, + true, + true, + )) + + // We don't need real connections, just the target in order to assert that we got the right connection back + // from the router. We mock out the actual connection with a net.Pipe. + enabledConn, err := grpc.Dial("enabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + _, pw := net.Pipe() + return pw, nil + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + // Close the connection immediately to clean up as we just care about the value of .Target() in the test. + testhelper.MustClose(t, enabledConn) + + disabledConn, err := grpc.Dial("disabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + _, pw := net.Pipe() + return pw, nil + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + testhelper.MustClose(t, disabledConn) + + elector := nodes.NewPerRepositoryElector(tx) + healthChecker := StaticHealthChecker{"virtual-storage": {"storage"}} + assignmentStore := datastore.NewAssignmentStore(tx, configuredStorages) + randomGenerator := rand.New(rand.NewSource(1)) + + enabledRouter := NewPerRepositoryRouter( + Connections{"virtual-storage": {"storage": enabledConn}}, + elector, + healthChecker, + randomGenerator, + repositoryStore, + assignmentStore, + repositoryStore, + nil, + ) + + disabledRouter := NewPerRepositoryRouter( + Connections{"virtual-storage": {"storage": disabledConn}}, + elector, + healthChecker, + randomGenerator, + repositoryStore, + assignmentStore, + repositoryStore, + nil, + ) + + return enabledConn, disabledConn, enabledRouter, disabledRouter +} diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 20b6cbcdb..c9015ee93 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -201,8 +201,10 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic. // Randomly enable the flag to exercise both paths to some extent. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0) - ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV238, rnd.Int()%2 == 0) + // PraefectUseYamuxConfigurationForGitaly gets tested in Praefect when routing RPCs and thus it affects many tests. + // Let's randomly select which connection we use so both sets of connections get tested somewhat. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectUseYamuxConfigurationForGitaly, rnd.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) |