Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-12-14 11:49:36 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-12-14 11:49:36 +0300
commitfb388ea4f85f04dc88e657fbbf33dbe571e19bd8 (patch)
treefd532e7404f745a3d0911350e54b7fa7fac254bc
parent2bba842a174b342c65d270721e3558f28a358dc3 (diff)
parent684e462c8953dcf435fe43e937e7d0f068a0ef17 (diff)
Merge branch 'ps-clean-up-params-usage' into 'master'
Clean up of the internals of the Coordinator type See merge request gitlab-org/gitaly!2920
-rw-r--r--internal/praefect/coordinator.go33
1 files changed, 12 insertions, 21 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index ad549f75e..4d5d7b6c9 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -254,20 +254,11 @@ func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) {
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
- targetRepo, err := call.methodInfo.TargetRepo(call.msg)
- if err != nil {
- return nil, helper.ErrInvalidArgument(fmt.Errorf("repo scoped: %w", err))
- }
-
ctxlogrus.AddFields(ctx, logrus.Fields{
"virtual_storage": call.targetRepo.StorageName,
"relative_path": call.targetRepo.RelativePath,
})
- if targetRepo.StorageName == "" || targetRepo.RelativePath == "" {
- return nil, helper.ErrInvalidArgumentf("repo scoped: target repo is invalid")
- }
-
praefectServer, err := metadata.PraefectFromConfig(c.conf)
if err != nil {
return nil, fmt.Errorf("repo scoped: could not create Praefect configuration: %w", err)
@@ -280,9 +271,9 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
var ps *proxy.StreamParameters
switch call.methodInfo.Operation {
case protoregistry.OpAccessor:
- ps, err = c.accessorStreamParameters(ctx, call, targetRepo)
+ ps, err = c.accessorStreamParameters(ctx, call)
case protoregistry.OpMutator:
- ps, err = c.mutatorStreamParameters(ctx, call, targetRepo)
+ ps, err = c.mutatorStreamParameters(ctx, call)
default:
err = fmt.Errorf("unknown operation type: %v", call.methodInfo.Operation)
}
@@ -294,9 +285,9 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
return ps, nil
}
-func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) {
- repoPath := targetRepo.GetRelativePath()
- virtualStorage := targetRepo.StorageName
+func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
+ repoPath := call.targetRepo.GetRelativePath()
+ virtualStorage := call.targetRepo.StorageName
node, err := c.router.RouteRepositoryAccessor(ctx, virtualStorage, repoPath)
if err != nil {
@@ -352,10 +343,11 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNod
return c.txMgr.RegisterTransaction(ctx, voters, threshold)
}
-func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) {
- virtualStorage := targetRepo.StorageName
+func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
+ targetRepo := call.targetRepo
+ virtualStorage := call.targetRepo.StorageName
- route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, call.targetRepo.RelativePath)
+ route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath)
if err != nil {
if errors.Is(err, ErrRepositoryReadOnly) {
return nil, err
@@ -417,14 +409,14 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
}
finalizers = append(finalizers,
- transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, call.targetRepo, change, params),
+ transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, targetRepo, change, params),
)
} else {
finalizers = append(finalizers,
c.newRequestFinalizer(
ctx,
virtualStorage,
- call.targetRepo,
+ targetRepo,
route.Primary.Storage,
nil,
append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...),
@@ -492,8 +484,7 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string,
methodInfo: mi,
msg: m,
targetRepo: targetRepo,
- },
- )
+ })
if err != nil {
if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
return nil, helper.ErrInvalidArgument(err)