diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-14 11:49:36 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-12-14 11:49:36 +0300 |
commit | fb388ea4f85f04dc88e657fbbf33dbe571e19bd8 (patch) | |
tree | fd532e7404f745a3d0911350e54b7fa7fac254bc | |
parent | 2bba842a174b342c65d270721e3558f28a358dc3 (diff) | |
parent | 684e462c8953dcf435fe43e937e7d0f068a0ef17 (diff) |
Merge branch 'ps-clean-up-params-usage' into 'master'
Clean up of the internals of the Coordinator type
See merge request gitlab-org/gitaly!2920
-rw-r--r-- | internal/praefect/coordinator.go | 33 |
1 files changed, 12 insertions, 21 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ad549f75e..4d5d7b6c9 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -254,20 +254,11 @@ func (c *Coordinator) Collect(metrics chan<- prometheus.Metric) { } func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { - targetRepo, err := call.methodInfo.TargetRepo(call.msg) - if err != nil { - return nil, helper.ErrInvalidArgument(fmt.Errorf("repo scoped: %w", err)) - } - ctxlogrus.AddFields(ctx, logrus.Fields{ "virtual_storage": call.targetRepo.StorageName, "relative_path": call.targetRepo.RelativePath, }) - if targetRepo.StorageName == "" || targetRepo.RelativePath == "" { - return nil, helper.ErrInvalidArgumentf("repo scoped: target repo is invalid") - } - praefectServer, err := metadata.PraefectFromConfig(c.conf) if err != nil { return nil, fmt.Errorf("repo scoped: could not create Praefect configuration: %w", err) @@ -280,9 +271,9 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr var ps *proxy.StreamParameters switch call.methodInfo.Operation { case protoregistry.OpAccessor: - ps, err = c.accessorStreamParameters(ctx, call, targetRepo) + ps, err = c.accessorStreamParameters(ctx, call) case protoregistry.OpMutator: - ps, err = c.mutatorStreamParameters(ctx, call, targetRepo) + ps, err = c.mutatorStreamParameters(ctx, call) default: err = fmt.Errorf("unknown operation type: %v", call.methodInfo.Operation) } @@ -294,9 +285,9 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr return ps, nil } -func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) { - repoPath := targetRepo.GetRelativePath() - virtualStorage := targetRepo.StorageName +func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { + repoPath := call.targetRepo.GetRelativePath() + virtualStorage := call.targetRepo.StorageName node, err := c.router.RouteRepositoryAccessor(ctx, virtualStorage, repoPath) if err != nil { @@ -352,10 +343,11 @@ func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNod return c.txMgr.RegisterTransaction(ctx, voters, threshold) } -func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) { - virtualStorage := targetRepo.StorageName +func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { + targetRepo := call.targetRepo + virtualStorage := call.targetRepo.StorageName - route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, call.targetRepo.RelativePath) + route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath) if err != nil { if errors.Is(err, ErrRepositoryReadOnly) { return nil, err @@ -417,14 +409,14 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } finalizers = append(finalizers, - transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, call.targetRepo, change, params), + transactionCleanup, c.createTransactionFinalizer(ctx, transaction, route, virtualStorage, targetRepo, change, params), ) } else { finalizers = append(finalizers, c.newRequestFinalizer( ctx, virtualStorage, - call.targetRepo, + targetRepo, route.Primary.Storage, nil, append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...), @@ -492,8 +484,7 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, methodInfo: mi, msg: m, targetRepo: targetRepo, - }, - ) + }) if err != nil { if errors.Is(err, nodes.ErrVirtualStorageNotExist) { return nil, helper.ErrInvalidArgument(err) |