diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-20 12:22:39 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-20 12:22:39 +0300 |
commit | d6c4f13dbff2729dffe834641d7e35cd6bbac5dc (patch) | |
tree | e73ca55453fae47a8d033d73b4e63b736eebadde /internal | |
parent | 159948cc8cd98d17c0d8599ac2b29aadd1d2c7d1 (diff) | |
parent | fdda523bafba3e028d7981c52719e7c1c34ae750 (diff) |
Merge branch 'pks-revert-3858' into 'master'
Revert "Merge branch 'smh-intercepted-methods' into 'master'"
See merge request gitlab-org/gitaly!3882
Diffstat (limited to 'internal')
-rw-r--r-- | internal/praefect/protoregistry/protoregistry.go | 2 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/repository_exists.go | 12 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/server.go | 9 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 5 | ||||
-rw-r--r-- | internal/protoutil/extension.go | 16 |
7 files changed, 36 insertions, 31 deletions
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 7434de176..6c9bbff5d 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -172,7 +172,7 @@ func New(protos ...*descriptorpb.FileDescriptorProto) (*Registry, error) { p.GetPackage(), svc.GetName(), method.GetName(), ) - if intercepted, err := protoutil.IsInterceptedMethod(svc, method); err != nil { + if intercepted, err := protoutil.IsInterceptedService(svc); err != nil { return nil, fmt.Errorf("is intercepted: %w", err) } else if intercepted { interceptedMethods[fullMethodName] = struct{}{} diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 6262afa92..c70bef342 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -106,6 +106,7 @@ func TestNewProtoRegistry(t *testing.T) { "FindRemoteRootRef": protoregistry.OpAccessor, }, "RepositoryService": { + "RepositoryExists": protoregistry.OpAccessor, "RepackIncremental": protoregistry.OpMutator, "RepackFull": protoregistry.OpMutator, "GarbageCollect": protoregistry.OpMutator, @@ -199,13 +200,13 @@ func TestNewProtoRegistry_IsInterceptedMethod(t *testing.T) { } func TestRequestFactory(t *testing.T) { - mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/GarbageCollect") + mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists") require.NoError(t, err) pb, err := mInfo.UnmarshalRequestProto([]byte{}) require.NoError(t, err) - testassert.ProtoEqual(t, &gitalypb.GarbageCollectRequest{}, pb) + testassert.ProtoEqual(t, &gitalypb.RepositoryExistsRequest{}, pb) } func TestMethodInfoScope(t *testing.T) { @@ -214,7 +215,7 @@ func TestMethodInfoScope(t *testing.T) { scope protoregistry.Scope }{ { - method: "/gitaly.RepositoryService/GarbageCollect", + method: "/gitaly.RepositoryService/RepositoryExists", scope: protoregistry.ScopeRepository, }, } { diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go index bbafff823..f8ae301cc 100644 --- a/internal/praefect/repository_exists.go +++ b/internal/praefect/repository_exists.go @@ -16,10 +16,14 @@ var ( errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path") ) -// RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking -// whether there is a record of the repository in the database. -func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler { - return func(srv interface{}, stream grpc.ServerStream) error { +// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists +// calls by checking whether there is a record of the repository in the database. +func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" { + return handler(srv, stream) + } + var req gitalypb.RepositoryExistsRequest if err := stream.RecvMsg(&req); err != nil { return fmt.Errorf("receive request: %w", err) diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 25987c472..a5e894eaf 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -61,6 +61,11 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"}, response: &gitalypb.RepositoryExistsResponse{Exists: true}, }, + { + desc: "routed to gitaly", + routeToGitaly: true, + error: errServedByGitaly, + }, } { t.Run(tc.desc, func(t *testing.T) { db.TruncateAll(t) @@ -71,13 +76,22 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) + electionStrategy := config.ElectionStrategyPerRepository + if tc.routeToGitaly { + electionStrategy = config.ElectionStrategySQL + } + tmp := testhelper.TempDir(t) ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) require.NoError(t, err) srv := NewGRPCServer( - config.Config{}, + config.Config{ + Failover: config.Failover{ + ElectionStrategy: electionStrategy, + }, + }, testhelper.DiscardTestEntry(t), protoregistry.GitalyProtoPreregistered, func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { diff --git a/internal/praefect/server.go b/internal/praefect/server.go index c043f0e4f..4eb4b1c4c 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -106,6 +106,10 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs)) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), @@ -126,11 +130,6 @@ func NewGRPCServer( srv := grpc.NewServer(grpcOpts...) registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter) - - proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ - "RepositoryExists": RepositoryExistsHandler(rs), - }) - return srv } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 731336515..4f95c1dee 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -644,10 +644,7 @@ func TestRenameRepository(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ - withQueue: evq, - withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil), - }) + cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{withQueue: evq}) defer cleanup() // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it diff --git a/internal/protoutil/extension.go b/internal/protoutil/extension.go index 9e2e9f9a0..33a294716 100644 --- a/internal/protoutil/extension.go +++ b/internal/protoutil/extension.go @@ -21,19 +21,9 @@ func GetOpExtension(m *descriptorpb.MethodDescriptorProto) (*gitalypb.OperationM return ext.(*gitalypb.OperationMsg), nil } -// IsInterceptedMethod returns whether the RPC method is intercepted by Praefect. -func IsInterceptedMethod(s *descriptorpb.ServiceDescriptorProto, m *descriptorpb.MethodDescriptorProto) (bool, error) { - isServiceIntercepted, err := getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted) - if err != nil { - return false, fmt.Errorf("is service intercepted: %w", err) - } - - isMethodIntercepted, err := getBoolExtension(m.GetOptions(), gitalypb.E_InterceptedMethod) - if err != nil { - return false, fmt.Errorf("is method intercepted: %w", err) - } - - return isServiceIntercepted || isMethodIntercepted, nil +// IsInterceptedService returns whether the serivce is intercepted by Praefect. +func IsInterceptedService(s *descriptorpb.ServiceDescriptorProto) (bool, error) { + return getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted) } // GetRepositoryExtension gets the repository extension from a field descriptor |