Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2021-09-20 12:22:39 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2021-09-20 12:22:39 +0300
commitd6c4f13dbff2729dffe834641d7e35cd6bbac5dc (patch)
treee73ca55453fae47a8d033d73b4e63b736eebadde /internal
parent159948cc8cd98d17c0d8599ac2b29aadd1d2c7d1 (diff)
parentfdda523bafba3e028d7981c52719e7c1c34ae750 (diff)
Merge branch 'pks-revert-3858' into 'master'
Revert "Merge branch 'smh-intercepted-methods' into 'master'" See merge request gitlab-org/gitaly!3882
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/protoregistry/protoregistry.go2
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go7
-rw-r--r--internal/praefect/repository_exists.go12
-rw-r--r--internal/praefect/repository_exists_test.go16
-rw-r--r--internal/praefect/server.go9
-rw-r--r--internal/praefect/server_test.go5
-rw-r--r--internal/protoutil/extension.go16
7 files changed, 36 insertions, 31 deletions
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 7434de176..6c9bbff5d 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -172,7 +172,7 @@ func New(protos ...*descriptorpb.FileDescriptorProto) (*Registry, error) {
p.GetPackage(), svc.GetName(), method.GetName(),
)
- if intercepted, err := protoutil.IsInterceptedMethod(svc, method); err != nil {
+ if intercepted, err := protoutil.IsInterceptedService(svc); err != nil {
return nil, fmt.Errorf("is intercepted: %w", err)
} else if intercepted {
interceptedMethods[fullMethodName] = struct{}{}
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index 6262afa92..c70bef342 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -106,6 +106,7 @@ func TestNewProtoRegistry(t *testing.T) {
"FindRemoteRootRef": protoregistry.OpAccessor,
},
"RepositoryService": {
+ "RepositoryExists": protoregistry.OpAccessor,
"RepackIncremental": protoregistry.OpMutator,
"RepackFull": protoregistry.OpMutator,
"GarbageCollect": protoregistry.OpMutator,
@@ -199,13 +200,13 @@ func TestNewProtoRegistry_IsInterceptedMethod(t *testing.T) {
}
func TestRequestFactory(t *testing.T) {
- mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/GarbageCollect")
+ mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists")
require.NoError(t, err)
pb, err := mInfo.UnmarshalRequestProto([]byte{})
require.NoError(t, err)
- testassert.ProtoEqual(t, &gitalypb.GarbageCollectRequest{}, pb)
+ testassert.ProtoEqual(t, &gitalypb.RepositoryExistsRequest{}, pb)
}
func TestMethodInfoScope(t *testing.T) {
@@ -214,7 +215,7 @@ func TestMethodInfoScope(t *testing.T) {
scope protoregistry.Scope
}{
{
- method: "/gitaly.RepositoryService/GarbageCollect",
+ method: "/gitaly.RepositoryService/RepositoryExists",
scope: protoregistry.ScopeRepository,
},
} {
diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go
index bbafff823..f8ae301cc 100644
--- a/internal/praefect/repository_exists.go
+++ b/internal/praefect/repository_exists.go
@@ -16,10 +16,14 @@ var (
errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path")
)
-// RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking
-// whether there is a record of the repository in the database.
-func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler {
- return func(srv interface{}, stream grpc.ServerStream) error {
+// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists
+// calls by checking whether there is a record of the repository in the database.
+func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" {
+ return handler(srv, stream)
+ }
+
var req gitalypb.RepositoryExistsRequest
if err := stream.RecvMsg(&req); err != nil {
return fmt.Errorf("receive request: %w", err)
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 25987c472..a5e894eaf 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -61,6 +61,11 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"},
response: &gitalypb.RepositoryExistsResponse{Exists: true},
},
+ {
+ desc: "routed to gitaly",
+ routeToGitaly: true,
+ error: errServedByGitaly,
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
db.TruncateAll(t)
@@ -71,13 +76,22 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false))
+ electionStrategy := config.ElectionStrategyPerRepository
+ if tc.routeToGitaly {
+ electionStrategy = config.ElectionStrategySQL
+ }
+
tmp := testhelper.TempDir(t)
ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
require.NoError(t, err)
srv := NewGRPCServer(
- config.Config{},
+ config.Config{
+ Failover: config.Failover{
+ ElectionStrategy: electionStrategy,
+ },
+ },
testhelper.DiscardTestEntry(t),
protoregistry.GitalyProtoPreregistered,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c043f0e4f..4eb4b1c4c 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -106,6 +106,10 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs))
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
@@ -126,11 +130,6 @@ func NewGRPCServer(
srv := grpc.NewServer(grpcOpts...)
registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter)
-
- proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
- "RepositoryExists": RepositoryExistsHandler(rs),
- })
-
return srv
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 731336515..4f95c1dee 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -644,10 +644,7 @@ func TestRenameRepository(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
- withQueue: evq,
- withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil),
- })
+ cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{withQueue: evq})
defer cleanup()
// virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
diff --git a/internal/protoutil/extension.go b/internal/protoutil/extension.go
index 9e2e9f9a0..33a294716 100644
--- a/internal/protoutil/extension.go
+++ b/internal/protoutil/extension.go
@@ -21,19 +21,9 @@ func GetOpExtension(m *descriptorpb.MethodDescriptorProto) (*gitalypb.OperationM
return ext.(*gitalypb.OperationMsg), nil
}
-// IsInterceptedMethod returns whether the RPC method is intercepted by Praefect.
-func IsInterceptedMethod(s *descriptorpb.ServiceDescriptorProto, m *descriptorpb.MethodDescriptorProto) (bool, error) {
- isServiceIntercepted, err := getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted)
- if err != nil {
- return false, fmt.Errorf("is service intercepted: %w", err)
- }
-
- isMethodIntercepted, err := getBoolExtension(m.GetOptions(), gitalypb.E_InterceptedMethod)
- if err != nil {
- return false, fmt.Errorf("is method intercepted: %w", err)
- }
-
- return isServiceIntercepted || isMethodIntercepted, nil
+// IsInterceptedService returns whether the serivce is intercepted by Praefect.
+func IsInterceptedService(s *descriptorpb.ServiceDescriptorProto) (bool, error) {
+ return getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted)
}
// GetRepositoryExtension gets the repository extension from a field descriptor