diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-01 17:29:47 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-01 17:29:47 +0300 |
commit | ef22886927fea74615329d9664fc2bb4aa609da6 (patch) | |
tree | 379a23b2f6babcd48e1ec4716da72201e6257ee6 | |
parent | 9ed0124f6cdfc359521feae325420549781d883e (diff) | |
parent | 7c517f28115fa4a6de59494b2db63be8185f7c91 (diff) |
Merge branch 'smh-intercept-create-repository' into 'master'
Intercept RepositoryExists calls in Praefect
See merge request gitlab-org/gitaly!3075
-rw-r--r-- | changelogs/unreleased/smh-intercept-create-repository.yml | 5 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 6 | ||||
-rw-r--r-- | internal/praefect/repository_exists.go | 58 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 121 | ||||
-rw-r--r-- | internal/praefect/server.go | 38 |
5 files changed, 209 insertions, 19 deletions
diff --git a/changelogs/unreleased/smh-intercept-create-repository.yml b/changelogs/unreleased/smh-intercept-create-repository.yml new file mode 100644 index 000000000..81db72ac7 --- /dev/null +++ b/changelogs/unreleased/smh-intercept-create-repository.yml @@ -0,0 +1,5 @@ +--- +title: Intercept RepositoryExists calls in Praefect +merge_request: 3075 +author: +type: changed diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index a4564ea58..6ae48474b 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -31,11 +31,11 @@ func (es ElectionStrategy) validate() error { const ( // ElectionStrategyLocal configures a single node, in-memory election strategy. - ElectionStrategyLocal = "local" + ElectionStrategyLocal ElectionStrategy = "local" // ElectionStrategySQL configures an SQL based strategy that elects a primary for a virtual storage. - ElectionStrategySQL = "sql" + ElectionStrategySQL ElectionStrategy = "sql" // ElectionStrategyPerRepository configures an SQL based strategy that elects different primaries per repository. - ElectionStrategyPerRepository = "per_repository" + ElectionStrategyPerRepository ElectionStrategy = "per_repository" ) type Failover struct { diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go new file mode 100644 index 000000000..0ecbf5ed6 --- /dev/null +++ b/internal/praefect/repository_exists.go @@ -0,0 +1,58 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + errMissingRepository = status.Error(codes.InvalidArgument, "missing repository") + errMissingStorageName = status.Error(codes.InvalidArgument, "repository missing storage name") + errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path") +) + +// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists +// calls by checking whether there is a record of the repository in the database. +func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" { + return handler(srv, stream) + } + + var req gitalypb.RepositoryExistsRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + repo := req.GetRepository() + if repo == nil { + return errMissingRepository + } + + storageName := repo.StorageName + if storageName == "" { + return errMissingStorageName + } + + relativePath := repo.RelativePath + if relativePath == "" { + return errMissingRelativePath + } + + exists, err := rs.RepositoryExists(stream.Context(), storageName, relativePath) + if err != nil { + return fmt.Errorf("repository exists: %w", err) + } + + if err := stream.SendMsg(&gitalypb.RepositoryExistsResponse{Exists: exists}); err != nil { + return fmt.Errorf("send response: %w", err) + } + + return nil + } +} diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go new file mode 100644 index 000000000..f0bb53a76 --- /dev/null +++ b/internal/praefect/repository_exists_test.go @@ -0,0 +1,121 @@ +// +build postgres + +package praefect + +import ( + "context" + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestRepositoryExistsStreamInterceptor(t *testing.T) { + errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly") + + for _, tc := range []struct { + desc string + routeToGitaly bool + repository *gitalypb.Repository + response *gitalypb.RepositoryExistsResponse + error error + }{ + { + desc: "missing repository", + error: errMissingRepository, + }, + { + desc: "missing storage name", + repository: &gitalypb.Repository{RelativePath: "relative-path"}, + error: errMissingStorageName, + }, + { + desc: "missing relative path", + repository: &gitalypb.Repository{StorageName: "virtual-storage"}, + error: errMissingRelativePath, + }, + { + desc: "invalid virtual storage", + repository: &gitalypb.Repository{StorageName: "invalid virtual storage", RelativePath: "relative-path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: false}, + }, + { + desc: "invalid relative path", + repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "invalid relative path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: false}, + }, + { + desc: "repository found", + repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: true}, + }, + { + desc: "routed to gitaly", + routeToGitaly: true, + error: errServedByGitaly, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db := getDB(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage")) + + electionStrategy := config.ElectionStrategyPerRepository + if tc.routeToGitaly { + electionStrategy = config.ElectionStrategySQL + } + + tmp, cleanDir := testhelper.TempDir(t) + defer cleanDir() + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer( + config.Config{ + Failover: config.Failover{ + ElectionStrategy: electionStrategy, + }, + }, + testhelper.DiscardTestEntry(t), + protoregistry.GitalyProtoPreregistered, + func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return nil, errServedByGitaly + }, + nil, + nil, + nil, + rs, + nil, + ) + defer srv.Stop() + + go func() { srv.Serve(ln) }() + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + + client := gitalypb.NewRepositoryServiceClient(clientConn) + _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: tc.repository}) + require.Equal(t, errServedByGitaly, err, "other RPCs should be passed through") + + resp, err := client.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: tc.repository}) + require.Equal(t, tc.error, err) + require.Equal(t, tc.response, resp) + }) + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 927450a53..dabac9dd3 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -54,24 +54,30 @@ func NewGRPCServer( grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } + streamInterceptors := []grpc.StreamServerInterceptor{ + grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + middleware.MethodTypeStreamInterceptor(registry), + metadatahandler.StreamInterceptor, + grpc_prometheus.StreamServerInterceptor, + grpc_logrus.StreamServerInterceptor(logger), + featureflag.StreamInterceptor, + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + grpctracing.StreamServerTracingInterceptor(), + auth.StreamServerInterceptor(conf.Auth), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + } + + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs)) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - middleware.MethodTypeStreamInterceptor(registry), - metadatahandler.StreamInterceptor, - grpc_prometheus.StreamServerInterceptor, - grpc_logrus.StreamServerInterceptor(logger), - featureflag.StreamInterceptor, - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - grpctracing.StreamServerTracingInterceptor(), - auth.StreamServerInterceptor(conf.Auth), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler |