Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-01-29 19:09:02 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-01 14:48:07 +0300
commit7c517f28115fa4a6de59494b2db63be8185f7c91 (patch)
treef1635a69c792f55f56017b8e4569385125748b54
parenteec4599a9eca5f196dc1d1fa1ab2b875166a88a7 (diff)
intercept RepositoryExists calls in Praefect
GitLab calls RepositoryExists prior to creating a repository. Since Praefect has no records of the repository before it is created, PerRepositoryRouter's accessor routing fails as it doesn't know where to send the requests without the database records. This commit fixes the problem by intercepting RepositoryExists in Praefect and checking from the database whether the repository exists or not. Praefect then responds with the information without involving the coordinator or any Gitalys. This only affects the repository specific primary stack as the SQL elector based stack just redirects the call to the virtual storage's primary if no records of a repository exist.
-rw-r--r--changelogs/unreleased/smh-intercept-create-repository.yml5
-rw-r--r--internal/praefect/repository_exists.go58
-rw-r--r--internal/praefect/repository_exists_test.go121
-rw-r--r--internal/praefect/server.go38
4 files changed, 206 insertions, 16 deletions
diff --git a/changelogs/unreleased/smh-intercept-create-repository.yml b/changelogs/unreleased/smh-intercept-create-repository.yml
new file mode 100644
index 000000000..81db72ac7
--- /dev/null
+++ b/changelogs/unreleased/smh-intercept-create-repository.yml
@@ -0,0 +1,5 @@
+---
+title: Intercept RepositoryExists calls in Praefect
+merge_request: 3075
+author:
+type: changed
diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go
new file mode 100644
index 000000000..0ecbf5ed6
--- /dev/null
+++ b/internal/praefect/repository_exists.go
@@ -0,0 +1,58 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ errMissingRepository = status.Error(codes.InvalidArgument, "missing repository")
+ errMissingStorageName = status.Error(codes.InvalidArgument, "repository missing storage name")
+ errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path")
+)
+
+// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists
+// calls by checking whether there is a record of the repository in the database.
+func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" {
+ return handler(srv, stream)
+ }
+
+ var req gitalypb.RepositoryExistsRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ repo := req.GetRepository()
+ if repo == nil {
+ return errMissingRepository
+ }
+
+ storageName := repo.StorageName
+ if storageName == "" {
+ return errMissingStorageName
+ }
+
+ relativePath := repo.RelativePath
+ if relativePath == "" {
+ return errMissingRelativePath
+ }
+
+ exists, err := rs.RepositoryExists(stream.Context(), storageName, relativePath)
+ if err != nil {
+ return fmt.Errorf("repository exists: %w", err)
+ }
+
+ if err := stream.SendMsg(&gitalypb.RepositoryExistsResponse{Exists: exists}); err != nil {
+ return fmt.Errorf("send response: %w", err)
+ }
+
+ return nil
+ }
+}
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
new file mode 100644
index 000000000..f0bb53a76
--- /dev/null
+++ b/internal/praefect/repository_exists_test.go
@@ -0,0 +1,121 @@
+// +build postgres
+
+package praefect
+
+import (
+ "context"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func TestRepositoryExistsStreamInterceptor(t *testing.T) {
+ errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
+
+ for _, tc := range []struct {
+ desc string
+ routeToGitaly bool
+ repository *gitalypb.Repository
+ response *gitalypb.RepositoryExistsResponse
+ error error
+ }{
+ {
+ desc: "missing repository",
+ error: errMissingRepository,
+ },
+ {
+ desc: "missing storage name",
+ repository: &gitalypb.Repository{RelativePath: "relative-path"},
+ error: errMissingStorageName,
+ },
+ {
+ desc: "missing relative path",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage"},
+ error: errMissingRelativePath,
+ },
+ {
+ desc: "invalid virtual storage",
+ repository: &gitalypb.Repository{StorageName: "invalid virtual storage", RelativePath: "relative-path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: false},
+ },
+ {
+ desc: "invalid relative path",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "invalid relative path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: false},
+ },
+ {
+ desc: "repository found",
+ repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"},
+ response: &gitalypb.RepositoryExistsResponse{Exists: true},
+ },
+ {
+ desc: "routed to gitaly",
+ routeToGitaly: true,
+ error: errServedByGitaly,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db := getDB(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage"))
+
+ electionStrategy := config.ElectionStrategyPerRepository
+ if tc.routeToGitaly {
+ electionStrategy = config.ElectionStrategySQL
+ }
+
+ tmp, cleanDir := testhelper.TempDir(t)
+ defer cleanDir()
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(
+ config.Config{
+ Failover: config.Failover{
+ ElectionStrategy: electionStrategy,
+ },
+ },
+ testhelper.DiscardTestEntry(t),
+ protoregistry.GitalyProtoPreregistered,
+ func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, errServedByGitaly
+ },
+ nil,
+ nil,
+ nil,
+ rs,
+ nil,
+ )
+ defer srv.Stop()
+
+ go func() { srv.Serve(ln) }()
+
+ clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithInsecure())
+ require.NoError(t, err)
+
+ client := gitalypb.NewRepositoryServiceClient(clientConn)
+ _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: tc.repository})
+ require.Equal(t, errServedByGitaly, err, "other RPCs should be passed through")
+
+ resp, err := client.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: tc.repository})
+ require.Equal(t, tc.error, err)
+ require.Equal(t, tc.response, resp)
+ })
+ }
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 927450a53..dabac9dd3 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -54,24 +54,30 @@ func NewGRPCServer(
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
+ streamInterceptors := []grpc.StreamServerInterceptor{
+ grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
+ grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
+ middleware.MethodTypeStreamInterceptor(registry),
+ metadatahandler.StreamInterceptor,
+ grpc_prometheus.StreamServerInterceptor,
+ grpc_logrus.StreamServerInterceptor(logger),
+ featureflag.StreamInterceptor,
+ sentryhandler.StreamLogHandler,
+ cancelhandler.Stream, // Should be below LogHandler
+ grpctracing.StreamServerTracingInterceptor(),
+ auth.StreamServerInterceptor(conf.Auth),
+ // Panic handler should remain last so that application panics will be
+ // converted to errors and logged
+ panichandler.StreamPanicHandler,
+ }
+
+ if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
+ streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs))
+ }
+
grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
- grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
- grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...),
- grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
- middleware.MethodTypeStreamInterceptor(registry),
- metadatahandler.StreamInterceptor,
- grpc_prometheus.StreamServerInterceptor,
- grpc_logrus.StreamServerInterceptor(logger),
- featureflag.StreamInterceptor,
- sentryhandler.StreamLogHandler,
- cancelhandler.Stream, // Should be below LogHandler
- grpctracing.StreamServerTracingInterceptor(),
- auth.StreamServerInterceptor(conf.Auth),
- // Panic handler should remain last so that application panics will be
- // converted to errors and logged
- panichandler.StreamPanicHandler,
- )),
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...),
grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler