Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-02-01 17:29:47 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-01 17:29:47 +0300
commitef22886927fea74615329d9664fc2bb4aa609da6 (patch)
tree379a23b2f6babcd48e1ec4716da72201e6257ee6
parent9ed0124f6cdfc359521feae325420549781d883e (diff)
parent7c517f28115fa4a6de59494b2db63be8185f7c91 (diff)
Merge branch 'smh-intercept-create-repository' into 'master'
Intercept RepositoryExists calls in Praefect See merge request gitlab-org/gitaly!3075
-rw-r--r--changelogs/unreleased/smh-intercept-create-repository.yml5
-rw-r--r--internal/praefect/config/config.go6
-rw-r--r--internal/praefect/repository_exists.go58
-rw-r--r--internal/praefect/repository_exists_test.go121
-rw-r--r--internal/praefect/server.go38
5 files changed, 209 insertions, 19 deletions
diff --git a/changelogs/unreleased/smh-intercept-create-repository.yml b/changelogs/unreleased/smh-intercept-create-repository.yml
new file mode 100644
index 000000000..81db72ac7
--- /dev/null
+++ b/changelogs/unreleased/smh-intercept-create-repository.yml
@@ -0,0 +1,5 @@
+---
+title: Intercept RepositoryExists calls in Praefect
+merge_request: 3075
+author:
+type: changed
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index a4564ea58..6ae48474b 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -31,11 +31,11 @@ func (es ElectionStrategy) validate() error {
const (
// ElectionStrategyLocal configures a single node, in-memory election strategy.
- ElectionStrategyLocal = "local"
+ ElectionStrategyLocal ElectionStrategy = "local"
// ElectionStrategySQL configures an SQL based strategy that elects a primary for a virtual storage.
- ElectionStrategySQL = "sql"
+ ElectionStrategySQL ElectionStrategy = "sql"
// ElectionStrategyPerRepository configures an SQL based strategy that elects different primaries per repository.
- ElectionStrategyPerRepository = "per_repository"
+ ElectionStrategyPerRepository ElectionStrategy = "per_repository"
)
type Failover struct {
diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go
new file mode 100644
index 000000000..0ecbf5ed6
--- /dev/null
+++ b/internal/praefect/repository_exists.go
@@ -0,0 +1,58 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ errMissingRepository = status.Error(codes.InvalidArgument, "missing repository")
+ errMissingStorageName = status.Error(codes.InvalidArgument, "repository missing storage name")
+ errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path")
+)
+
+// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists
+// calls by checking whether there is a record of the repository in the database.
+func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" {
+ return handler(srv, stream)
+ }
+
+ var req gitalypb.RepositoryExistsRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ repo := req.GetRepository()
+ if repo == nil {
+ return errMissingRepository
+ }
+
+ storageName := repo.StorageName
+ if storageName == "" {
+ return errMissingStorageName
+ }
+
+ relativePath := repo.RelativePath
+ if relativePath == "" {
+ return errMissingRelativePath
+ }
+
+ exists, err := rs.RepositoryExists(stream.Context(), storageName, relativePath)
+ if err != nil {
+ return fmt.Errorf("repository exists: %w", err)
+ }
+
+ if err := stream.SendMsg(&gitalypb.RepositoryExistsResponse{Exists: exists}); err != nil {
+ return fmt.Errorf("send response: %w", err)
+ }
+
+ return nil
+ }
+}
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
new file mode 100644
index 000000000..f0bb53a76
--- /dev/null
+++ b/internal/praefect/repository_exists_test.go
@@ -0,0 +1,121 @@
+// +build postgres
+
+package praefect
+
+import (
+ "context"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func TestRepositoryExistsStreamInterceptor(t *testing.T) {
+ errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
+
+ for _, tc := range []struct {
+ desc string
+ routeToGitaly bool
+ repository *gitalypb.Repository
+ response *gitalypb.RepositoryExistsResponse
+ error error
+ }{
+ {
+ desc: "missing repository",
+ error: errMissingRepository,
+ },
+ {
+ desc: "missing storage name",
+ repository: &gitalypb.Repository{RelativePath: "relative-path"},
+ error: errMissingStorageName,
+ },
+ {
+ desc: "missing relative path",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage"},
+ error: errMissingRelativePath,
+ },
+ {
+ desc: "invalid virtual storage",
+ repository: &gitalypb.Repository{StorageName: "invalid virtual storage", RelativePath: "relative-path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: false},
+ },
+ {
+ desc: "invalid relative path",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "invalid relative path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: false},
+ },
+ {
+ desc: "repository found",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: true},
+ },
+ {
+ desc: "routed to gitaly",
+ routeToGitaly: true,
+ error: errServedByGitaly,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db := getDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage"))
+
+ electionStrategy := config.ElectionStrategyPerRepository
+ if tc.routeToGitaly {
+ electionStrategy = config.ElectionStrategySQL
+ }
+
+ tmp, cleanDir := testhelper.TempDir(t)
+ defer cleanDir()
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(
+ config.Config{
+ Failover: config.Failover{
+ ElectionStrategy: electionStrategy,
+ },
+ },
+ testhelper.DiscardTestEntry(t),
+ protoregistry.GitalyProtoPreregistered,
+ func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, errServedByGitaly
+ },
+ nil,
+ nil,
+ nil,
+ rs,
+ nil,
+ )
+ defer srv.Stop()
+
+ go func() { srv.Serve(ln) }()
+
+ clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithInsecure())
+ require.NoError(t, err)
+
+ client := gitalypb.NewRepositoryServiceClient(clientConn)
+ _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: tc.repository})
+ require.Equal(t, errServedByGitaly, err, "other RPCs should be passed through")
+
+ resp, err := client.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: tc.repository})
+ require.Equal(t, tc.error, err)
+ require.Equal(t, tc.response, resp)
+ })
+ }
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 927450a53..dabac9dd3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -54,24 +54,30 @@ func NewGRPCServer(
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
+ streamInterceptors := []grpc.StreamServerInterceptor{
+ grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
+ grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
+ middleware.MethodTypeStreamInterceptor(registry),
+ metadatahandler.StreamInterceptor,
+ grpc_prometheus.StreamServerInterceptor,
+ grpc_logrus.StreamServerInterceptor(logger),
+ featureflag.StreamInterceptor,
+ sentryhandler.StreamLogHandler,
+ cancelhandler.Stream, // Should be below LogHandler
+ grpctracing.StreamServerTracingInterceptor(),
+ auth.StreamServerInterceptor(conf.Auth),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.StreamPanicHandler,
+ }
+
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs))
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
- grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
- grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
- grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- middleware.MethodTypeStreamInterceptor(registry),
- metadatahandler.StreamInterceptor,
- grpc_prometheus.StreamServerInterceptor,
- grpc_logrus.StreamServerInterceptor(logger),
- featureflag.StreamInterceptor,
- sentryhandler.StreamLogHandler,
- cancelhandler.Stream, // Should be below LogHandler
- grpctracing.StreamServerTracingInterceptor(),
- auth.StreamServerInterceptor(conf.Auth),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.StreamPanicHandler,
- )),
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler