Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-09-17 16:56:27 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-09-17 16:56:27 +0300
commit0caad2deff2f830b09176de72fe029f62e6f4da1 (patch)
tree21024e23f7fc856dcb00ac7f7b01a1100b78ec89
parent974ed277520086b2a9c830fab11a3076234f13a9 (diff)
parent9251df2df09fec0cd3600faa6178156fe9a84fe2 (diff)
Merge branch 'smh-request-router' into 'master'
Extract routing logic in to a separate component See merge request gitlab-org/gitaly!2552
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator.go122
-rw-r--r--internal/praefect/coordinator_test.go66
-rw-r--r--internal/praefect/helper_test.go2
-rw-r--r--internal/praefect/replicator_test.go9
-rw-r--r--internal/praefect/router.go145
-rw-r--r--internal/praefect/server_factory_test.go9
-rw-r--r--internal/praefect/server_test.go11
9 files changed, 267 insertions, 101 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 2ae6a3329..f5a62f389 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -272,7 +272,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
coordinator = praefect.NewCoordinator(
queue,
rs,
- nodeManager,
+ praefect.NewNodeManagerRouter(nodeManager, rs),
transactionManager,
conf,
protoregistry.GitalyProtoPreregistered,
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 055a1025d..27d19de5d 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -177,7 +177,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
registry, err := protoregistry.New(fd)
require.NoError(t, err)
- coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(queue, rs, NewNodeManagerRouter(nodeMgr, rs), txMgr, conf, registry)
srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 6a193f514..4d13dd54e 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -75,7 +75,7 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- nodeMgr nodes.Manager
+ router Router
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
rs datastore.RepositoryStore
@@ -88,7 +88,7 @@ type Coordinator struct {
func NewCoordinator(
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
- nodeMgr nodes.Manager,
+ router Router,
txMgr *transactions.Manager,
conf config.Config,
r *protoregistry.Registry,
@@ -104,7 +104,7 @@ func NewCoordinator(
queue: queue,
rs: rs,
registry: r,
- nodeMgr: nodeMgr,
+ router: router,
txMgr: txMgr,
conf: conf,
votersMetric: prometheus.NewHistogramVec(
@@ -173,22 +173,21 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
repoPath := targetRepo.GetRelativePath()
virtualStorage := targetRepo.StorageName
- node, err := c.nodeMgr.GetSyncedNode(ctx, virtualStorage, repoPath)
+ node, err := c.router.RouteRepositoryAccessor(ctx, virtualStorage, repoPath)
if err != nil {
- return nil, fmt.Errorf("accessor call: get synced: %w", err)
+ return nil, fmt.Errorf("accessor call: route repository accessor: %w", err)
}
- storage := node.GetStorage()
- b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, storage)
+ b, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, node.Storage)
if err != nil {
return nil, fmt.Errorf("accessor call: rewrite storage: %w", err)
}
- metrics.ReadDistribution.WithLabelValues(virtualStorage, storage).Inc()
+ metrics.ReadDistribution.WithLabelValues(virtualStorage, node.Storage).Inc()
return proxy.NewStreamParameters(proxy.Destination{
Ctx: helper.IncomingToOutgoing(ctx),
- Conn: node.GetConnection(),
+ Conn: node.Connection,
Msg: b,
}, nil, nil, nil), nil
}
@@ -220,7 +219,7 @@ func shouldUseTransaction(ctx context.Context, method string) bool {
return true
}
-func (c *Coordinator) registerTransaction(ctx context.Context, primary nodes.Node, secondaries []nodes.Node) (*transactions.Transaction, transactions.CancelFunc, error) {
+func (c *Coordinator) registerTransaction(ctx context.Context, primary Node, secondaries []Node) (*transactions.Transaction, transactions.CancelFunc, error) {
var voters []transactions.Voter
var threshold uint
@@ -230,14 +229,14 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary nodes.Nod
// not matter.
voters = append(voters, transactions.Voter{
- Name: primary.GetStorage(),
+ Name: primary.Storage,
Votes: 1,
})
threshold = 1
for _, node := range secondaries {
voters = append(voters, transactions.Voter{
- Name: node.GetStorage(),
+ Name: node.Storage,
Votes: 0,
})
}
@@ -250,14 +249,14 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary nodes.Nod
// In order to ensure that no quorum can be reached without the primary, its number
// of votes needs to exceed the number of secondaries.
voters = append(voters, transactions.Voter{
- Name: primary.GetStorage(),
+ Name: primary.Storage,
Votes: secondaryLen + 1,
})
threshold = secondaryLen + 1
for _, secondary := range secondaries {
voters = append(voters, transactions.Voter{
- Name: secondary.GetStorage(),
+ Name: secondary.Storage,
Votes: 1,
})
}
@@ -277,18 +276,16 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary nodes.Nod
func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) {
virtualStorage := targetRepo.StorageName
- shard, err := c.nodeMgr.GetShard(virtualStorage)
+ route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, call.targetRepo.RelativePath)
if err != nil {
- return nil, fmt.Errorf("mutator call: get shard: %w", err)
- }
+ if errors.Is(err, ErrRepositoryReadOnly) {
+ return nil, err
+ }
- if latest, err := c.rs.IsLatestGeneration(ctx, virtualStorage, call.targetRepo.RelativePath, shard.Primary.GetStorage()); err != nil {
- return nil, fmt.Errorf("check generation: %w", err)
- } else if !latest {
- return nil, ErrRepositoryReadOnly
+ return nil, fmt.Errorf("mutator call: route repository mutator: %w", err)
}
- primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, shard.Primary.GetStorage())
+ primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage)
if err != nil {
return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
}
@@ -302,62 +299,46 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
primaryDest := proxy.Destination{
Ctx: helper.IncomingToOutgoing(ctx),
- Conn: shard.Primary.GetConnection(),
+ Conn: route.Primary.Connection,
Msg: primaryMessage,
}
var secondaryDests []proxy.Destination
if shouldUseTransaction(ctx, call.fullMethodName) {
- // Only healthy secondaries which are consistent with the primary are allowed to take
- // part in the transaction. Unhealthy nodes would block the transaction until they come back.
- // Inconsistent nodes will anyway need repair so including them doesn't make sense. They
- // also might vote to abort which might unnecessarily fail the transaction.
- consistentSecondaries, err := c.rs.GetConsistentSecondaries(ctx, virtualStorage, targetRepo.RelativePath, shard.Primary.GetStorage())
- if err != nil {
- return nil, err
- }
+ c.votersMetric.WithLabelValues(virtualStorage).Observe(float64(1 + len(route.Secondaries)))
- participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries))
- for _, secondary := range shard.GetHealthySecondaries() {
- if _, ok := consistentSecondaries[secondary.GetStorage()]; ok {
- participatingSecondaries = append(participatingSecondaries, secondary)
- }
- }
-
- c.votersMetric.WithLabelValues(virtualStorage).Observe(float64(1 + len(participatingSecondaries)))
-
- transaction, transactionCleanup, err := c.registerTransaction(ctx, shard.Primary, participatingSecondaries)
+ transaction, transactionCleanup, err := c.registerTransaction(ctx, route.Primary, route.Secondaries)
if err != nil {
return nil, err
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), shard.Primary.GetStorage(), true)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
if err != nil {
return nil, err
}
primaryDest.Ctx = helper.IncomingToOutgoing(injectedCtx)
- for _, secondary := range participatingSecondaries {
- secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.GetStorage())
+ for _, secondary := range route.Secondaries {
+ secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.Storage)
if err != nil {
return nil, err
}
- injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.GetStorage(), false)
+ injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false)
if err != nil {
return nil, err
}
secondaryDests = append(secondaryDests, proxy.Destination{
Ctx: helper.IncomingToOutgoing(injectedCtx),
- Conn: secondary.GetConnection(),
+ Conn: secondary.Connection,
Msg: secondaryMsg,
})
}
finalizers = append(finalizers,
- transactionCleanup, c.createTransactionFinalizer(ctx, transaction, shard, virtualStorage, call.targetRepo, change, params),
+ transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, call.targetRepo, change, params),
)
} else {
finalizers = append(finalizers,
@@ -365,9 +346,9 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
ctx,
virtualStorage,
call.targetRepo,
- shard.Primary.GetStorage(),
+ route.Primary.Storage,
nil,
- nodesToStorages(shard.Secondaries),
+ append(nodesToStorages(route.Secondaries), route.ReplicationTargets...),
change,
params,
))
@@ -473,17 +454,15 @@ func (c *Coordinator) directStorageScopedMessage(ctx context.Context, mi protore
}
func (c *Coordinator) accessorStorageStreamParameters(ctx context.Context, mi protoregistry.MethodInfo, msg proto.Message, virtualStorage string) (*proxy.StreamParameters, error) {
- shard, err := c.nodeMgr.GetShard(virtualStorage)
+ node, err := c.router.RouteStorageAccessor(ctx, virtualStorage)
if err != nil {
if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
return nil, helper.ErrInvalidArgument(err)
}
- return nil, helper.ErrInternalf("accessor storage scoped: get shard %q: %w", virtualStorage, err)
+ return nil, helper.ErrInternalf("accessor storage scoped: route storage accessor %q: %w", virtualStorage, err)
}
- primaryStorage := shard.Primary.GetStorage()
-
- b, err := rewrittenStorageMessage(mi, msg, primaryStorage)
+ b, err := rewrittenStorageMessage(mi, msg, node.Storage)
if err != nil {
return nil, helper.ErrInvalidArgument(fmt.Errorf("accessor storage scoped: %w", err))
}
@@ -493,7 +472,7 @@ func (c *Coordinator) accessorStorageStreamParameters(ctx context.Context, mi pr
// https://gitlab.com/gitlab-org/gitaly/-/issues/2972
primaryDest := proxy.Destination{
Ctx: ctx,
- Conn: shard.Primary.GetConnection(),
+ Conn: node.Connection,
Msg: b,
}
@@ -501,7 +480,7 @@ func (c *Coordinator) accessorStorageStreamParameters(ctx context.Context, mi pr
}
func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi protoregistry.MethodInfo, msg proto.Message, virtualStorage string) (*proxy.StreamParameters, error) {
- shard, err := c.nodeMgr.GetShard(virtualStorage)
+ route, err := c.router.RouteStorageMutator(ctx, virtualStorage)
if err != nil {
if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
return nil, helper.ErrInvalidArgument(err)
@@ -509,27 +488,24 @@ func (c *Coordinator) mutatorStorageStreamParameters(ctx context.Context, mi pro
return nil, helper.ErrInternalf("mutator storage scoped: get shard %q: %w", virtualStorage, err)
}
- primaryStorage := shard.Primary.GetStorage()
-
- b, err := rewrittenStorageMessage(mi, msg, primaryStorage)
+ b, err := rewrittenStorageMessage(mi, msg, route.Primary.Storage)
if err != nil {
return nil, helper.ErrInvalidArgument(fmt.Errorf("mutator storage scoped: %w", err))
}
primaryDest := proxy.Destination{
Ctx: ctx,
- Conn: shard.Primary.GetConnection(),
+ Conn: route.Primary.Connection,
Msg: b,
}
- secondaries := shard.GetHealthySecondaries()
- secondaryDests := make([]proxy.Destination, len(secondaries))
- for i, secondary := range secondaries {
- b, err := rewrittenStorageMessage(mi, msg, secondary.GetStorage())
+ secondaryDests := make([]proxy.Destination, len(route.Secondaries))
+ for i, secondary := range route.Secondaries {
+ b, err := rewrittenStorageMessage(mi, msg, secondary.Storage)
if err != nil {
return nil, helper.ErrInvalidArgument(fmt.Errorf("mutator storage scoped: %w", err))
}
- secondaryDests[i] = proxy.Destination{Ctx: ctx, Conn: secondary.GetConnection(), Msg: b}
+ secondaryDests[i] = proxy.Destination{Ctx: ctx, Conn: secondary.Connection, Msg: b}
}
return proxy.NewStreamParameters(primaryDest, secondaryDests, func() error { return nil }, nil), nil
@@ -581,7 +557,7 @@ func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, err
func (c *Coordinator) createTransactionFinalizer(
ctx context.Context,
transaction *transactions.Transaction,
- shard nodes.Shard,
+ route RepositoryMutatorRoute,
virtualStorage string,
targetRepo *gitalypb.Repository,
change datastore.ChangeType,
@@ -596,24 +572,24 @@ func (c *Coordinator) createTransactionFinalizer(
if transaction.CountSubtransactions() == 0 {
secondaries := make([]string, 0, len(successByNode))
for secondary := range successByNode {
- if secondary == shard.Primary.GetStorage() {
+ if secondary == route.Primary.Storage {
continue
}
secondaries = append(secondaries, secondary)
}
return c.newRequestFinalizer(
- ctx, virtualStorage, targetRepo, shard.Primary.GetStorage(),
+ ctx, virtualStorage, targetRepo, route.Primary.Storage,
nil, secondaries, change, params)()
}
// If the primary node failed the transaction, then
// there's no sense in trying to replicate from primary
// to secondaries.
- if !successByNode[shard.Primary.GetStorage()] {
+ if !successByNode[route.Primary.Storage] {
return fmt.Errorf("transaction: primary failed vote")
}
- delete(successByNode, shard.Primary.GetStorage())
+ delete(successByNode, route.Primary.Storage)
updatedSecondaries := make([]string, 0, len(successByNode))
var outdatedSecondaries []string
@@ -628,15 +604,15 @@ func (c *Coordinator) createTransactionFinalizer(
}
return c.newRequestFinalizer(
- ctx, virtualStorage, targetRepo, shard.Primary.GetStorage(),
+ ctx, virtualStorage, targetRepo, route.Primary.Storage,
updatedSecondaries, outdatedSecondaries, change, params)()
}
}
-func nodesToStorages(nodes []nodes.Node) []string {
+func nodesToStorages(nodes []Node) []string {
storages := make([]string, len(nodes))
for i, n := range nodes {
- storages[i] = n.GetStorage()
+ storages[i] = n.Storage
}
return storages
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index a229a69a8..61354f6df 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -75,7 +75,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- rs := datastore.NewMemoryRepositoryStore(nil)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, 1))
if tc.readOnly {
require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, 0))
@@ -84,14 +84,14 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
coordinator := NewCoordinator(
datastore.NewMemoryReplicationEventQueue(conf),
rs,
- &nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
+ NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
require.Equal(t, virtualStorage, vs)
return nodes.Shard{
Primary: &nodes.MockNode{GetStorageMethod: func() string {
return storage
}},
}, nil
- }},
+ }}, rs),
transactions.NewManager(conf),
conf,
protoregistry.GitalyProtoPreregistered,
@@ -156,11 +156,12 @@ func TestStreamDirectorMutator(t *testing.T) {
nodeMgr.Start(0, time.Hour)
txMgr := transactions.NewManager(conf)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
coordinator := NewCoordinator(
queueInterceptor,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- nodeMgr,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
@@ -383,7 +384,14 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
require.NoError(t, rs.SetGeneration(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, n.generation))
}
- coordinator := NewCoordinator(queueInterceptor, rs, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
@@ -509,11 +517,12 @@ func TestStreamDirectorAccessor(t *testing.T) {
nodeMgr.Start(0, time.Minute)
txMgr := transactions.NewManager(conf)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
coordinator := NewCoordinator(
queue,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- nodeMgr,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
@@ -602,7 +611,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
coordinator := NewCoordinator(
queue,
repoStore,
- nodeMgr,
+ NewNodeManagerRouter(nodeMgr, repoStore),
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
@@ -797,11 +806,12 @@ func TestAbsentCorrelationID(t *testing.T) {
nodeMgr.Start(0, time.Hour)
txMgr := transactions.NewManager(conf)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
coordinator := NewCoordinator(
queueInterceptor,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- nodeMgr,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
@@ -921,7 +931,14 @@ func TestStreamDirectorStorageScope(t *testing.T) {
nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
- coordinator := NewCoordinator(nil, rs, nodeMgr, nil, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ nil,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ nil,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
ctx, cancel := testhelper.Context()
defer cancel()
@@ -978,10 +995,12 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
return nodes.Shard{}, assert.AnError
},
}
+
+ rs := datastore.NewMemoryRepositoryStore(nil)
coordinator := NewCoordinator(
nil,
- datastore.NewMemoryRepositoryStore(nil),
- mgr,
+ rs,
+ NewNodeManagerRouter(mgr, rs),
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
@@ -1005,10 +1024,12 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
return nodes.Shard{}, nodes.ErrVirtualStorageNotExist
},
}
+
+ rs := datastore.NewMemoryRepositoryStore(nil)
coordinator := NewCoordinator(
nil,
- datastore.NewMemoryRepositoryStore(nil),
- mgr,
+ rs,
+ NewNodeManagerRouter(mgr, rs),
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
@@ -1033,10 +1054,12 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
},
}
+
+ rs := datastore.NewMemoryRepositoryStore(nil)
coordinator := NewCoordinator(
nil,
- datastore.NewMemoryRepositoryStore(nil),
- mgr,
+ rs,
+ NewNodeManagerRouter(mgr, rs),
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
@@ -1053,7 +1076,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
result, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.Internal, result.Code())
- require.Equal(t, `accessor storage scoped: get shard "fake": primary is not healthy`, result.Message())
+ require.Equal(t, `accessor storage scoped: route storage accessor "fake": primary is not healthy`, result.Message())
})
t.Run("mutator", func(t *testing.T) {
@@ -1063,10 +1086,11 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
},
}
+ rs := datastore.NewMemoryRepositoryStore(nil)
coordinator := NewCoordinator(
nil,
- datastore.NewMemoryRepositoryStore(nil),
- mgr,
+ rs,
+ NewNodeManagerRouter(mgr, rs),
nil,
config.Config{},
protoregistry.GitalyProtoPreregistered,
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 3ca5c0221..10f3aec4a 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -220,7 +220,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
coordinator := NewCoordinator(
opt.withQueue,
rs,
- opt.withNodeMgr,
+ NewNodeManagerRouter(opt.withNodeMgr, rs),
opt.withTxMgr,
conf,
opt.withAnnotations,
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 3a943d5b8..1602489a1 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -321,7 +321,14 @@ func TestPropagateReplicationJob(t *testing.T) {
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
- coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queue,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, rs, nodeMgr)
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
new file mode 100644
index 000000000..29513d67a
--- /dev/null
+++ b/internal/praefect/router.go
@@ -0,0 +1,145 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "google.golang.org/grpc"
+)
+
+// Node is a storage node in a virtual storage.
+type Node struct {
+ // Storage is the name of the storage node.
+ Storage string
+ // Connection is a gRPC connection to the storage node.
+ Connection *grpc.ClientConn
+}
+
+// StorageMutatorRoute describes how to route a storage scoped mutator call.
+type StorageMutatorRoute struct {
+ // Primary is the primary node of the routing decision.
+ Primary Node
+ // Secondaries are the secondary nodes of the routing decision.
+ Secondaries []Node
+}
+
+// StorageMutatorRoute describes how to route a repository scoped mutator call.
+type RepositoryMutatorRoute struct {
+ // Primary is the primary node of the transaction.
+ Primary Node
+ // Secondaries are the secondary participating in a transaction.
+ Secondaries []Node
+ // ReplicationTargets are additional nodes that do not participate in a transaction
+ // but need the changes replicated.
+ ReplicationTargets []string
+}
+
+// Router decides which nodes to direct accessor and mutator RPCs to.
+type Router interface {
+ // RouteStorageAccessor returns the node which should serve the storage accessor request.
+ RouteStorageAccessor(ctx context.Context, virtualStorage string) (Node, error)
+ // RouteStorageAccessor returns the primary and secondaries that should handle the storage
+ // mutator request.
+ RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
+ // RouteRepositoryAccessor returns the node that should serve the repository accessor request.
+ RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (Node, error)
+ // RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
+ // Additionally, it returns nodes which should have the change replicated to.
+ RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
+}
+
+type nodeManagerRouter struct {
+ mgr nodes.Manager
+ rs datastore.RepositoryStore
+}
+
+func toNode(node nodes.Node) Node {
+ return Node{
+ Storage: node.GetStorage(),
+ Connection: node.GetConnection(),
+ }
+}
+
+func toNodes(nodes []nodes.Node) []Node {
+ out := make([]Node, len(nodes))
+ for i := range nodes {
+ out[i] = toNode(nodes[i])
+ }
+ return out
+}
+
+// NeWNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.
+func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router {
+ return &nodeManagerRouter{mgr: mgr, rs: rs}
+}
+
+func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (Node, error) {
+ node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath)
+ if err != nil {
+ return Node{}, fmt.Errorf("get synced node: %w", err)
+ }
+
+ return toNode(node), nil
+}
+
+func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (Node, error) {
+ shard, err := r.mgr.GetShard(virtualStorage)
+ if err != nil {
+ return Node{}, err
+ }
+
+ return toNode(shard.Primary), nil
+}
+
+func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) {
+ shard, err := r.mgr.GetShard(virtualStorage)
+ if err != nil {
+ return StorageMutatorRoute{}, err
+ }
+
+ return StorageMutatorRoute{
+ Primary: toNode(shard.Primary),
+ Secondaries: toNodes(shard.GetHealthySecondaries()),
+ }, nil
+}
+
+func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) {
+ shard, err := r.mgr.GetShard(virtualStorage)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err)
+ }
+
+ if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()); err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("check generation: %w", err)
+ } else if !latest {
+ return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
+ }
+
+ // Only healthy secondaries which are consistent with the primary are allowed to take
+ // part in the transaction. Unhealthy nodes would block the transaction until they come back.
+ // Inconsistent nodes will anyway need repair so including them doesn't make sense. They
+ // also might vote to abort which might unnecessarily fail the transaction.
+ consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, shard.Primary.GetStorage())
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err)
+ }
+
+ var replicationTargets []string
+ participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries))
+ for _, secondary := range shard.Secondaries {
+ if _, ok := consistentSecondaries[secondary.GetStorage()]; ok && secondary.IsHealthy() {
+ participatingSecondaries = append(participatingSecondaries, secondary)
+ continue
+ }
+
+ replicationTargets = append(replicationTargets, secondary.GetStorage())
+ }
+
+ return RepositoryMutatorRoute{
+ Primary: toNode(shard.Primary),
+ Secondaries: toNodes(participatingSecondaries),
+ ReplicationTargets: replicationTargets,
+ }, nil
+}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 6ce6e364d..158d2c3d4 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -86,7 +86,14 @@ func TestServerFactory(t *testing.T) {
txMgr := transactions.NewManager(conf)
registry := protoregistry.GitalyProtoPreregistered
rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
- coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(
+ queue,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ registry,
+ )
checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient {
t.Helper()
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 6aee6fd99..bf6de086e 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -760,7 +760,7 @@ func TestProxyWrites(t *testing.T) {
coordinator := NewCoordinator(
queue,
rs,
- nodeMgr,
+ NewNodeManagerRouter(nodeMgr, rs),
txMgr,
conf,
protoregistry.GitalyProtoPreregistered,
@@ -925,7 +925,14 @@ func TestErrorThreshold(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), registry, errorTracker)
require.NoError(t, err)
- coordinator := NewCoordinator(queue, rs, nodeMgr, nil, conf, registry)
+ coordinator := NewCoordinator(
+ queue,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ nil,
+ conf,
+ registry,
+ )
server := grpc.NewServer(
grpc.CustomCodec(proxy.NewCodec()),