Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2022-12-01 21:09:09 +0300
committerWill Chandler <wchandler@gitlab.com>2022-12-01 21:09:09 +0300
commitcd7d897a9e96740665a2ded5efb6e71c00deffb0 (patch)
tree01fae004d8e6fe352b392ee671c98012c18641db
parentbeed7f98615584976d09368afc1443ea91349ac6 (diff)
parent492a5b04807cfa179441564875a789f9dd06460d (diff)
Merge branch 'smh-fix-buffer-size' into 'master'
Add feature flag for testing connections with smaller Yamux window size See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5086 Merged-by: Will Chandler <wchandler@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--cmd/praefect/main.go86
-rw-r--r--config.praefect.toml.example8
-rw-r--r--internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go10
-rw-r--r--internal/praefect/config/config.go40
-rw-r--r--internal/praefect/config/config_test.go21
-rw-r--r--internal/praefect/config/testdata/config.toml4
-rw-r--r--internal/praefect/nodes/health_manager.go25
-rw-r--r--internal/praefect/nodes/health_manager_test.go25
-rw-r--r--internal/praefect/router.go49
-rw-r--r--internal/praefect/router_test.go150
-rw-r--r--internal/testhelper/testhelper.go4
11 files changed, 392 insertions, 30 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index f0fe2b3e6..d8443cf67 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -68,6 +68,7 @@ import (
"strings"
"time"
+ "github.com/hashicorp/yamux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -78,6 +79,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry"
"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
"gitlab.com/gitlab-org/gitaly/v15/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
@@ -308,7 +310,16 @@ func run(
transactionManager := transactions.NewManager(conf)
sidechannelRegistry := sidechannel.NewRegistry()
- clientHandshaker := backchannel.NewClientHandshaker(logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry))
+
+ newClientHandshaker := func(config func(*yamux.Config)) backchannel.ClientHandshaker {
+ return backchannel.NewClientHandshakerWithYamuxConfig(
+ logger,
+ praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry),
+ config,
+ )
+ }
+
+ clientHandshaker := newClientHandshaker(nil)
assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
var (
nodeManager nodes.Manager
@@ -318,38 +329,73 @@ func run(
primaryGetter praefect.PrimaryGetter
)
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
+ dialNodes := func(clientHandshaker backchannel.ClientHandshaker) (praefect.NodeSet, error) {
+ return praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
+ }
+
+ nodeSet, err = dialNodes(clientHandshaker)
if err != nil {
return fmt.Errorf("dial nodes: %w", err)
}
defer nodeSet.Close()
- hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
- go func() {
- if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
- logger.WithError(err).Error("health manager exited")
- }
- }()
- healthChecker = hm
+ // Dial a second set of connections to each storage with a different Yamux config
+ // so we can use a feature flag to gradually test the new configuration.
+ nodeSetWithCustomConfig, err := dialNodes(newClientHandshaker(func(cfg *yamux.Config) {
+ cfg.AcceptBacklog = conf.Yamux.AcceptBacklog
+ cfg.MaxStreamWindowSize = conf.Yamux.MaximumStreamWindowSizeBytes
+ }))
+ if err != nil {
+ return fmt.Errorf("dial nodes reduced window: %w", err)
+ }
+ defer nodeSetWithCustomConfig.Close()
+
+ runHealthManager := func(db glsql.Querier, nodeSet praefect.NodeSet) *nodes.HealthManager {
+ hm := nodes.NewHealthManager(logger, db, nodes.GeneratePraefectName(conf, logger), nodeSet.HealthClients())
+ go func() {
+ if err := hm.Run(ctx, helper.NewTimerTicker(time.Second)); err != nil {
+ logger.WithError(err).Error("health manager exited")
+ }
+ }()
+
+ return hm
+ }
+
+ nodeSetHealthManager := runHealthManager(db, nodeSet)
+ healthChecker = nodeSetHealthManager
+
+ nodeSetWithCustomConfigHealthManager := runHealthManager(nil, nodeSetWithCustomConfig)
// Wait for the first health check to complete so the Praefect doesn't start serving RPC
// before the router is ready with the health status of the nodes.
- <-hm.Updated()
+ <-nodeSetHealthManager.Updated()
+ <-nodeSetWithCustomConfigHealthManager.Updated()
elector := nodes.NewPerRepositoryElector(db)
primaryGetter = elector
assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames())
- router = praefect.NewPerRepositoryRouter(
- nodeSet.Connections(),
- elector,
- hm,
- praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
- csg,
- assignmentStore,
- rs,
- conf.DefaultReplicationFactors(),
+ newPerRepositoryRouter := func(healthManager *nodes.HealthManager, nodeSet praefect.NodeSet) *praefect.PerRepositoryRouter {
+ return praefect.NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ elector,
+ healthManager,
+ praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
+ csg,
+ assignmentStore,
+ rs,
+ conf.DefaultReplicationFactors(),
+ )
+ }
+
+ // Configure a router that uses a feature flag to switch between two different sets of connections.
+ // This way we can toggle the feature flag to use a different set of connections for proxied
+ // RPC calls.
+ router = praefect.NewFeatureFlaggedRouter(
+ featureflag.PraefectUseYamuxConfigurationForGitaly,
+ newPerRepositoryRouter(nodeSetWithCustomConfigHealthManager, nodeSetWithCustomConfig),
+ newPerRepositoryRouter(nodeSetHealthManager, nodeSet),
)
if conf.BackgroundVerification.VerificationInterval > 0 {
@@ -358,7 +404,7 @@ func run(
logger,
db,
nodeSet.Connections(),
- hm,
+ nodeSetHealthManager,
conf.BackgroundVerification.VerificationInterval.Duration(),
conf.BackgroundVerification.DeleteInvalidRecords,
)
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index afe0f9153..f7daaf128 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -66,3 +66,11 @@ name = 'praefect'
storage = "praefect-git-2"
address = "tcp://praefect-git-2.internal"
token = 'token3'
+
+[yamux]
+# MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams.
+# Higher value can increase throughput at the cost of more memory usage.
+maximum_stream_window_size_bytes = 262144
+# AcceptBacklog sets the maximum number of stream openings in-flight
+# before further openings block.
+accept_backlog = 256
diff --git a/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go
new file mode 100644
index 000000000..59eb95b0e
--- /dev/null
+++ b/internal/metadata/featureflag/ff_praefect_use_yamux_configuration_for_gitaly.go
@@ -0,0 +1,10 @@
+package featureflag
+
+// PraefectUseYamuxConfigurationForGitaly switches the RPCs to be proxied over
+// connections that are set up with custom Yamux configuration.
+var PraefectUseYamuxConfigurationForGitaly = NewFeatureFlag(
+ "praefect_use_yamux_configuration_for_gitaly",
+ "v15.7.0",
+ "https://gitlab.com/gitlab-org/gitaly/-/issues/4644",
+ false,
+)
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 9092b2c6f..dd270edb5 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -6,6 +6,7 @@ import (
"os"
"time"
+ "github.com/hashicorp/yamux"
"github.com/pelletier/go-toml/v2"
promclient "github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
@@ -155,6 +156,40 @@ type Config struct {
MemoryQueueEnabled bool `toml:"memory_queue_enabled,omitempty"`
GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"`
RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"`
+ Yamux Yamux `toml:"yamux,omitempty"`
+}
+
+// Yamux contains Yamux related configuration values.
+type Yamux struct {
+ // MaximumStreamWindowSizeBytes sets the maximum window size in bytes used for yamux streams.
+ // Higher value can increase throughput at the cost of more memory usage.
+ MaximumStreamWindowSizeBytes uint32 `toml:"maximum_stream_window_size_bytes,omitempty"`
+ // AcceptBacklog sets the maximum number of stream openings in-flight before further openings
+ // block.
+ AcceptBacklog int `toml:"accept_backlog,omitempty"`
+}
+
+func (cfg Yamux) validate() error {
+ if cfg.MaximumStreamWindowSizeBytes < 262144 {
+ // Yamux requires the stream window size to be at minimum 256KiB.
+ return fmt.Errorf("yamux.maximum_stream_window_size_bytes must be at least 262144 but it was %d", cfg.MaximumStreamWindowSizeBytes)
+ }
+
+ if cfg.AcceptBacklog < 1 {
+ // Yamux requires accept backlog to be at least 1
+ return fmt.Errorf("yamux.accept_backlog must be at least 1 but it was %d", cfg.AcceptBacklog)
+ }
+
+ return nil
+}
+
+// DefaultYamuxConfig returns the default Yamux configuration values.
+func DefaultYamuxConfig() Yamux {
+ defaultCfg := yamux.DefaultConfig()
+ return Yamux{
+ MaximumStreamWindowSizeBytes: defaultCfg.MaxStreamWindowSize,
+ AcceptBacklog: defaultCfg.AcceptBacklog,
+ }
}
// VirtualStorage represents a set of nodes for a storage
@@ -185,6 +220,7 @@ func FromFile(filePath string) (Config, error) {
// Sets the default Failover, to be overwritten when deserializing the TOML
Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository},
RepositoriesCleanup: DefaultRepositoriesCleanup(),
+ Yamux: DefaultYamuxConfig(),
}
if err := toml.Unmarshal(b, conf); err != nil {
return Config{}, err
@@ -279,6 +315,10 @@ func (c *Config) Validate() error {
}
}
+ if err := c.Yamux.validate(); err != nil {
+ return err
+ }
+
return nil
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index cc89c919b..68e24b6a8 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -207,6 +207,20 @@ func TestConfigValidation(t *testing.T) {
},
errMsg: `repositories_cleanup.run_interval is less then 1m0s, which could lead to a database performance problem`,
},
+ {
+ desc: "yamux.maximum_stream_window_size_bytes is too low",
+ changeConfig: func(cfg *Config) {
+ cfg.Yamux.MaximumStreamWindowSizeBytes = 16
+ },
+ errMsg: `yamux.maximum_stream_window_size_bytes must be at least 262144 but it was 16`,
+ },
+ {
+ desc: "yamux.maximum_stream_window_size_bytes is too low",
+ changeConfig: func(cfg *Config) {
+ cfg.Yamux.AcceptBacklog = 0
+ },
+ errMsg: `yamux.accept_backlog must be at least 1 but it was 0`,
+ },
}
for _, tc := range testCases {
@@ -220,6 +234,7 @@ func TestConfigValidation(t *testing.T) {
},
Failover: Failover{ElectionStrategy: ElectionStrategySQL},
RepositoriesCleanup: DefaultRepositoriesCleanup(),
+ Yamux: DefaultYamuxConfig(),
}
tc.changeConfig(&config)
@@ -332,6 +347,10 @@ func TestConfigParsing(t *testing.T) {
VerificationInterval: duration.Duration(24 * time.Hour),
DeleteInvalidRecords: true,
},
+ Yamux: Yamux{
+ MaximumStreamWindowSizeBytes: 1000,
+ AcceptBacklog: 2000,
+ },
},
},
{
@@ -358,6 +377,7 @@ func TestConfigParsing(t *testing.T) {
RepositoriesInBatch: 11,
},
BackgroundVerification: DefaultBackgroundVerificationConfig(),
+ Yamux: DefaultYamuxConfig(),
},
},
{
@@ -381,6 +401,7 @@ func TestConfigParsing(t *testing.T) {
RepositoriesInBatch: 16,
},
BackgroundVerification: DefaultBackgroundVerificationConfig(),
+ Yamux: DefaultYamuxConfig(),
},
},
{
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index f7d591eee..a3112a9f2 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -81,3 +81,7 @@ read_error_threshold_count = 100
check_interval = "1s"
run_interval = "3s"
repositories_in_batch = 10
+
+[yamux]
+maximum_stream_window_size_bytes = 1000
+accept_backlog = 2000 \ No newline at end of file
diff --git a/internal/praefect/nodes/health_manager.go b/internal/praefect/nodes/health_manager.go
index 21cdfc9fe..0743bf520 100644
--- a/internal/praefect/nodes/health_manager.go
+++ b/internal/praefect/nodes/health_manager.go
@@ -58,6 +58,9 @@ type HealthManager struct {
// NewHealthManager returns a new health manager that monitors which nodes in the cluster
// are healthy.
+//
+// If db is nil, the HealthManager checks the connection health normally but doesn't persist
+// any information about the nodes in the database.
func NewHealthManager(
log logrus.FieldLogger,
db glsql.Querier,
@@ -138,10 +141,13 @@ func (hm *HealthManager) updateHealthChecks(ctx context.Context, virtualStorages
hm.locallyHealthy.Store(locallyHealthy)
- ctx, cancel := hm.databaseTimeout(ctx)
- defer cancel()
+ if hm.db != nil {
+ // Database is nil only when an alternative set of connections is being tested behind a feature flag
+ // and we do not want to affect the consensus in the database, just the routing decisions.
+ ctx, cancel := hm.databaseTimeout(ctx)
+ defer cancel()
- if _, err := hm.db.ExecContext(ctx, `
+ if _, err := hm.db.ExecContext(ctx, `
INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
SELECT $1, shard_name, node_name, NOW(), CASE WHEN is_healthy THEN NOW() ELSE NULL END
FROM (
@@ -155,12 +161,13 @@ ON CONFLICT (praefect_name, shard_name, node_name)
last_contact_attempt_at = NOW(),
last_seen_active_at = COALESCE(EXCLUDED.last_seen_active_at, node_status.last_seen_active_at)
`,
- hm.praefectName,
- virtualStorages,
- physicalStorages,
- healthy,
- ); err != nil {
- return fmt.Errorf("update checks: %w", err)
+ hm.praefectName,
+ virtualStorages,
+ physicalStorages,
+ healthy,
+ ); err != nil {
+ return fmt.Errorf("update checks: %w", err)
+ }
}
if hm.firstUpdate {
diff --git a/internal/praefect/nodes/health_manager_test.go b/internal/praefect/nodes/health_manager_test.go
index f53297854..15b191cda 100644
--- a/internal/praefect/nodes/health_manager_test.go
+++ b/internal/praefect/nodes/health_manager_test.go
@@ -49,6 +49,31 @@ func getHealthConsensus(t *testing.T, ctx context.Context, db glsql.Querier) map
return consensus
}
+func TestHealthManagerWithoutDatabase(t *testing.T) {
+ t.Parallel()
+
+ hm := NewHealthManager(testhelper.NewDiscardingLogger(t), nil, "ignored", HealthClients{
+ "virtual-storage": {
+ "healthy-storage": mockHealthClient{
+ CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
+ },
+ },
+ "unhealthy-storage": mockHealthClient{
+ CheckFunc: func(context.Context, *grpc_health_v1.HealthCheckRequest, ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
+ return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
+ },
+ },
+ },
+ })
+ hm.handleError = func(err error) error { return err }
+
+ runCtx, cancelRun := context.WithCancel(testhelper.Context(t))
+ require.Equal(t, context.Canceled, hm.Run(runCtx, helper.NewCountTicker(1, cancelRun)))
+ require.Equal(t, map[string][]string{"virtual-storage": {"healthy-storage"}}, hm.HealthyNodes())
+ <-hm.Updated()
+}
+
func TestHealthManager(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index 139bd62f9..025a76638 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -5,6 +5,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
"google.golang.org/grpc"
)
@@ -144,3 +145,51 @@ type Router interface {
// never be replicated.
RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}
+
+type featureFlaggedRouter struct {
+ flag featureflag.FeatureFlag
+ enabled Router
+ disabled Router
+}
+
+// NewFeatureFlaggedRouter returns a Router that decides switches router implementation depending
+// on a feature flag's status.
+func NewFeatureFlaggedRouter(flag featureflag.FeatureFlag, enabledRouter, disabledRouter Router) Router {
+ return featureFlaggedRouter{
+ flag: flag,
+ enabled: enabledRouter,
+ disabled: disabledRouter,
+ }
+}
+
+func (r featureFlaggedRouter) router(ctx context.Context) Router {
+ if r.flag.IsEnabled(ctx) {
+ return r.enabled
+ }
+
+ return r.disabled
+}
+
+func (r featureFlaggedRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) {
+ return r.router(ctx).RouteStorageAccessor(ctx, virtualStorage)
+}
+
+func (r featureFlaggedRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) {
+ return r.router(ctx).RouteStorageMutator(ctx, virtualStorage)
+}
+
+func (r featureFlaggedRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error) {
+ return r.router(ctx).RouteRepositoryAccessor(ctx, virtualStorage, relativePath, forcePrimary)
+}
+
+func (r featureFlaggedRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return r.router(ctx).RouteRepositoryMutator(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
+}
+
+func (r featureFlaggedRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error) {
+ return r.router(ctx).RouteRepositoryCreation(ctx, virtualStorage, relativePath, additionalRepoRelativePath)
+}
+
+func (r featureFlaggedRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error) {
+ return r.router(ctx).RouteRepositoryMaintenance(ctx, virtualStorage, relativePath)
+}
diff --git a/internal/praefect/router_test.go b/internal/praefect/router_test.go
new file mode 100644
index 000000000..c3c507aa1
--- /dev/null
+++ b/internal/praefect/router_test.go
@@ -0,0 +1,150 @@
+package praefect
+
+import (
+ "context"
+ "errors"
+ "math/rand"
+ "net"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/metadata/featureflag"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestNewFeatureFlaggedRouter(t *testing.T) {
+ ctx := testhelper.Context(t)
+ flag := featureflag.FeatureFlag{Name: "test_flag"}
+
+ enabledConn, disabledConn, enabledRouter, disabledRouter := setupFeatureFlaggedRouters(t, ctx)
+ router := NewFeatureFlaggedRouter(flag, enabledRouter, disabledRouter)
+
+ for _, tc := range []struct {
+ desc string
+ flagEnabled bool
+ expectedConn *grpc.ClientConn
+ }{
+ {
+ desc: "flag disabled",
+ flagEnabled: false,
+ expectedConn: disabledConn,
+ },
+ {
+ desc: "flag enabled",
+ flagEnabled: true,
+ expectedConn: enabledConn,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx = featureflag.ContextWithFeatureFlag(ctx, flag, tc.flagEnabled)
+
+ t.Run("RouteStorageAccessor", func(t *testing.T) {
+ route, err := router.RouteStorageAccessor(ctx, "virtual-storage")
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedConn.Target(), route.Connection.Target())
+ })
+
+ t.Run("RouteStorageMutator", func(t *testing.T) {
+ _, err := router.RouteStorageMutator(ctx, "virtual-storage")
+ require.Equal(t, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter"), err)
+ })
+
+ t.Run("RouteRepositoryAccessor", func(t *testing.T) {
+ route, err := router.RouteRepositoryAccessor(ctx, "virtual-storage", "relative-path", false)
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedConn.Target(), route.Node.Connection.Target())
+ })
+
+ t.Run("RouteRepositoryMutator", func(t *testing.T) {
+ route, err := router.RouteRepositoryMutator(ctx, "virtual-storage", "relative-path", "")
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target())
+ })
+
+ t.Run("RouteRepositoryCreation", func(t *testing.T) {
+ route, err := router.RouteRepositoryCreation(ctx, "virtual-storage", "relative-path-new", "")
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedConn.Target(), route.Primary.Connection.Target())
+ })
+
+ t.Run("RouteRepositoryMaintenance", func(t *testing.T) {
+ route, err := router.RouteRepositoryMaintenance(ctx, "virtual-storage", "relative-path")
+ require.NoError(t, err)
+ require.Equal(t, tc.expectedConn.Target(), route.Nodes[0].Connection.Target())
+ })
+ })
+ }
+}
+
+func setupFeatureFlaggedRouters(t *testing.T, ctx context.Context) (*grpc.ClientConn, *grpc.ClientConn, *PerRepositoryRouter, *PerRepositoryRouter) {
+ db := testdb.New(t)
+ tx := db.Begin(t)
+ t.Cleanup(func() { tx.Rollback(t) })
+ configuredStorages := map[string][]string{"virtual-storage": {"storage"}}
+ testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": configuredStorages})
+
+ repositoryStore := datastore.NewPostgresRepositoryStore(tx, configuredStorages)
+ require.NoError(t, repositoryStore.CreateRepository(
+ ctx,
+ 1,
+ "virtual-storage",
+ "relative-path",
+ "replica-path",
+ "storage",
+ nil,
+ nil,
+ true,
+ true,
+ ))
+
+ // We don't need real connections, just the target in order to assert that we got the right connection back
+ // from the router. We mock out the actual connection with a net.Pipe.
+ enabledConn, err := grpc.Dial("enabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
+ _, pw := net.Pipe()
+ return pw, nil
+ }), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ // Close the connection immediately to clean up as we just care about the value of .Target() in the test.
+ testhelper.MustClose(t, enabledConn)
+
+ disabledConn, err := grpc.Dial("disabledConn", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
+ _, pw := net.Pipe()
+ return pw, nil
+ }), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ testhelper.MustClose(t, disabledConn)
+
+ elector := nodes.NewPerRepositoryElector(tx)
+ healthChecker := StaticHealthChecker{"virtual-storage": {"storage"}}
+ assignmentStore := datastore.NewAssignmentStore(tx, configuredStorages)
+ randomGenerator := rand.New(rand.NewSource(1))
+
+ enabledRouter := NewPerRepositoryRouter(
+ Connections{"virtual-storage": {"storage": enabledConn}},
+ elector,
+ healthChecker,
+ randomGenerator,
+ repositoryStore,
+ assignmentStore,
+ repositoryStore,
+ nil,
+ )
+
+ disabledRouter := NewPerRepositoryRouter(
+ Connections{"virtual-storage": {"storage": disabledConn}},
+ elector,
+ healthChecker,
+ randomGenerator,
+ repositoryStore,
+ assignmentStore,
+ repositoryStore,
+ nil,
+ )
+
+ return enabledConn, disabledConn, enabledRouter, disabledRouter
+}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index 20b6cbcdb..c9015ee93 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -201,8 +201,10 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context {
// NodeErrorCancelsVoter affect many tests as it changes Praefect coordinator transaction logic.
// Randomly enable the flag to exercise both paths to some extent.
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.NodeErrorCancelsVoter, rnd.Int()%2 == 0)
-
ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV238, rnd.Int()%2 == 0)
+ // PraefectUseYamuxConfigurationForGitaly gets tested in Praefect when routing RPCs and thus it affects many tests.
+ // Let's randomly select which connection we use so both sets of connections get tested somewhat.
+ ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectUseYamuxConfigurationForGitaly, rnd.Int()%2 == 0)
for _, opt := range opts {
ctx = opt(ctx)