Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-22 09:08:59 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-09-22 09:08:59 +0300
commitf308b90f8eb1ccfdafdbff49639d3487b8a14387 (patch)
treeb79f929fc67909c4300b3efdc188e6e9d4eb14fc /internal
parentf99dae3b6e630e3a251a3af2f3029320c594d829 (diff)
parent95383c3e309f5700adf5afb2450a77fe38d0e556 (diff)
Merge branch 'smh-fix-repository-exists' into 'master'
Unrevert RepositoryExists interceptor changes See merge request gitlab-org/gitaly!3887
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/protoregistry/protoregistry.go2
-rw-r--r--internal/praefect/repository_exists.go12
-rw-r--r--internal/praefect/repository_exists_test.go18
-rw-r--r--internal/praefect/server.go11
-rw-r--r--internal/praefect/server_test.go7
-rw-r--r--internal/protoutil/extension.go16
6 files changed, 37 insertions, 29 deletions
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 6c9bbff5d..7434de176 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -172,7 +172,7 @@ func New(protos ...*descriptorpb.FileDescriptorProto) (*Registry, error) {
p.GetPackage(), svc.GetName(), method.GetName(),
)
- if intercepted, err := protoutil.IsInterceptedService(svc); err != nil {
+ if intercepted, err := protoutil.IsInterceptedMethod(svc, method); err != nil {
return nil, fmt.Errorf("is intercepted: %w", err)
} else if intercepted {
interceptedMethods[fullMethodName] = struct{}{}
diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go
index f8ae301cc..bbafff823 100644
--- a/internal/praefect/repository_exists.go
+++ b/internal/praefect/repository_exists.go
@@ -16,14 +16,10 @@ var (
errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path")
)
-// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists
-// calls by checking whether there is a record of the repository in the database.
-func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor {
- return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" {
- return handler(srv, stream)
- }
-
+// RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking
+// whether there is a record of the repository in the database.
+func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
var req gitalypb.RepositoryExistsRequest
if err := stream.RecvMsg(&req); err != nil {
return fmt.Errorf("receive request: %w", err)
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index a5e894eaf..45e315299 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -20,7 +20,7 @@ import (
"google.golang.org/grpc/status"
)
-func TestRepositoryExistsStreamInterceptor(t *testing.T) {
+func TestRepositoryExistsHandler(t *testing.T) {
t.Parallel()
errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
@@ -76,22 +76,18 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) {
require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false))
- electionStrategy := config.ElectionStrategyPerRepository
- if tc.routeToGitaly {
- electionStrategy = config.ElectionStrategySQL
- }
-
tmp := testhelper.TempDir(t)
ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
require.NoError(t, err)
+ electionStrategy := config.ElectionStrategyPerRepository
+ if tc.routeToGitaly {
+ electionStrategy = config.ElectionStrategySQL
+ }
+
srv := NewGRPCServer(
- config.Config{
- Failover: config.Failover{
- ElectionStrategy: electionStrategy,
- },
- },
+ config.Config{Failover: config.Failover{ElectionStrategy: electionStrategy}},
testhelper.DiscardTestEntry(t),
protoregistry.GitalyProtoPreregistered,
func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 4eb4b1c4c..7e9570a2e 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -106,10 +106,6 @@ func NewGRPCServer(
panichandler.StreamPanicHandler,
}
- if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs))
- }
-
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)),
@@ -130,6 +126,13 @@ func NewGRPCServer(
srv := grpc.NewServer(grpcOpts...)
registerServices(srv, nodeMgr, txMgr, conf, queue, rs, assignmentStore, service.Connections(conns), primaryGetter)
+
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
+ "RepositoryExists": RepositoryExistsHandler(rs),
+ })
+ }
+
return srv
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 4f95c1dee..275f21903 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -608,7 +608,7 @@ func TestRenameRepository(t *testing.T) {
repoPaths := make([]string, len(gitalyStorages))
praefectCfg := config.Config{
VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}},
- Failover: config.Failover{Enabled: true},
+ Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository},
}
var repo *gitalypb.Repository
@@ -644,7 +644,10 @@ func TestRenameRepository(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{withQueue: evq})
+ cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
+ withQueue: evq,
+ withRepoStore: datastore.NewPostgresRepositoryStore(glsql.NewDB(t), nil),
+ })
defer cleanup()
// virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it
diff --git a/internal/protoutil/extension.go b/internal/protoutil/extension.go
index 33a294716..9e2e9f9a0 100644
--- a/internal/protoutil/extension.go
+++ b/internal/protoutil/extension.go
@@ -21,9 +21,19 @@ func GetOpExtension(m *descriptorpb.MethodDescriptorProto) (*gitalypb.OperationM
return ext.(*gitalypb.OperationMsg), nil
}
-// IsInterceptedService returns whether the serivce is intercepted by Praefect.
-func IsInterceptedService(s *descriptorpb.ServiceDescriptorProto) (bool, error) {
- return getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted)
+// IsInterceptedMethod returns whether the RPC method is intercepted by Praefect.
+func IsInterceptedMethod(s *descriptorpb.ServiceDescriptorProto, m *descriptorpb.MethodDescriptorProto) (bool, error) {
+ isServiceIntercepted, err := getBoolExtension(s.GetOptions(), gitalypb.E_Intercepted)
+ if err != nil {
+ return false, fmt.Errorf("is service intercepted: %w", err)
+ }
+
+ isMethodIntercepted, err := getBoolExtension(m.GetOptions(), gitalypb.E_InterceptedMethod)
+ if err != nil {
+ return false, fmt.Errorf("is method intercepted: %w", err)
+ }
+
+ return isServiceIntercepted || isMethodIntercepted, nil
}
// GetRepositoryExtension gets the repository extension from a field descriptor