diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-22 09:08:59 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-09-22 09:08:59 +0300 |
commit | f308b90f8eb1ccfdafdbff49639d3487b8a14387 (patch) | |
tree | b79f929fc67909c4300b3efdc188e6e9d4eb14fc /internal | |
parent | f99dae3b6e630e3a251a3af2f3029320c594d829 (diff) | |
parent | 95383c3e309f5700adf5afb2450a77fe38d0e556 (diff) |
Merge branch 'smh-fix-repository-exists' into 'master'
Unrevert RepositoryExists interceptor changes
See merge request gitlab-org/gitaly!3887
Diffstat (limited to 'internal')
-rw-r--r-- | internal/praefect/protoregistry/protoregistry.go | 2 | ||||
-rw-r--r-- | internal/praefect/repository_exists.go | 12 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/server.go | 11 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 7 | ||||
-rw-r--r-- | internal/protoutil/extension.go | 16 |
6 files changed, 37 insertions, 29 deletions
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 6c9bbff5d..7434de176 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -172,7 +172,7 @@ func New(protos ...*descriptorpb.FileDescriptorProto) (*Registry, error) { p.GetPackage(), svc.GetName(), method.GetName(), ) - if intercepted, err := protoutil.IsInterceptedService(svc); err != nil { + if intercepted, err := protoutil.IsInterceptedMethod(svc, method); err != nil { return nil, fmt.Errorf("is intercepted: %w", err) } else if intercepted { interceptedMethods[fullMethodName] = struct{}{} diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go index f8ae301cc..bbafff823 100644 --- a/internal/praefect/repository_exists.go +++ b/internal/praefect/repository_exists.go @@ -16,14 +16,10 @@ var ( errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path") ) -// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists -// calls by checking whether there is a record of the repository in the database. -func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor { - return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" { - return handler(srv, stream) - } - +// RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking +// whether there is a record of the repository in the database. +func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { var req gitalypb.RepositoryExistsRequest if err := stream.RecvMsg(&req); err != nil { return fmt.Errorf("receive request: %w", err) diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index a5e894eaf..45e315299 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -20,7 +20,7 @@ import ( "google.golang.org/grpc/status" ) -func TestRepositoryExistsStreamInterceptor(t *testing.T) { +func TestRepositoryExistsHandler(t *testing.T) { t.Parallel() errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly") @@ -76,22 +76,18 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) - electionStrategy := config.ElectionStrategyPerRepository - if tc.routeToGitaly { - electionStrategy = config.ElectionStrategySQL - } - tmp := testhelper.TempDir(t) ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) require.NoError(t, err) + electionStrategy := config.ElectionStrategyPerRepository + if tc.routeToGitaly { + electionStrategy = config.ElectionStrategySQL + } + srv := NewGRPCServer( - config.Config{ - Failover: config.Failover{ - ElectionStrategy: electionStrategy, - }, - }, + config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}}, testhelper.DiscardTestEntry(t), protoregistry.GitalyProtoPreregistered, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 4eb4b1c4c..7e9570a2e 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -106,10 +106,6 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } - if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { - streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs)) - } - grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), @@ -130,6 +126,13 @@ func NewGRPCServer( srv := grpc.NewServer(grpcOpts...) registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter) + + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ + "RepositoryExists": RepositoryExistsHandler(rs), + }) + } + return srv } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 4f95c1dee..275f21903 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -608,7 +608,7 @@ func TestRenameRepository(t *testing.T) { repoPaths := make([]string, len(gitalyStorages)) praefectCfg := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}, - Failover: config.Failover{Enabled: true}, + Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository}, } var repo *gitalypb.Repository @@ -644,7 +644,10 @@ func TestRenameRepository(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{withQueue: evq}) + cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ + withQueue: evq, + withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil), + }) defer cleanup() // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it diff --git a/internal/protoutil/extension.go b/internal/protoutil/extension.go index 33a294716..9e2e9f9a0 100644 --- a/internal/protoutil/extension.go +++ b/internal/protoutil/extension.go @@ -21,9 +21,19 @@ func GetOpExtension(m *descriptorpb.MethodDescriptorProto) (*gitalypb.OperationM return ext.(*gitalypb.OperationMsg), nil } -// IsInterceptedService returns whether the serivce is intercepted by Praefect. -func IsInterceptedService(s *descriptorpb.ServiceDescriptorProto) (bool, error) { - return getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted) +// IsInterceptedMethod returns whether the RPC method is intercepted by Praefect. +func IsInterceptedMethod(s *descriptorpb.ServiceDescriptorProto, m *descriptorpb.MethodDescriptorProto) (bool, error) { + isServiceIntercepted, err := getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted) + if err != nil { + return false, fmt.Errorf("is service intercepted: %w", err) + } + + isMethodIntercepted, err := getBoolExtension(m.GetOptions(), gitalypb.E_InterceptedMethod) + if err != nil { + return false, fmt.Errorf("is method intercepted: %w", err) + } + + return isServiceIntercepted || isMethodIntercepted, nil } // GetRepositoryExtension gets the repository extension from a field descriptor |