diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-01-29 19:09:02 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-02-01 14:48:07 +0300 |
commit | 7c517f28115fa4a6de59494b2db63be8185f7c91 (patch) | |
tree | f1635a69c792f55f56017b8e4569385125748b54 | |
parent | eec4599a9eca5f196dc1d1fa1ab2b875166a88a7 (diff) |
intercept RepositoryExists calls in Praefect
GitLab calls RepositoryExists prior to creating a repository. Since
Praefect has no records of the repository before it is created,
PerRepositoryRouter's accessor routing fails as it doesn't know
where to send the requests without the database records.
This commit fixes the problem by intercepting RepositoryExists in
Praefect and checking from the database whether the repository
exists or not. Praefect then responds with the information without
involving the coordinator or any Gitalys.
This only affects the repository specific primary stack as the SQL
elector based stack just redirects the call to the virtual storage's
primary if no records of a repository exist.
-rw-r--r-- | changelogs/unreleased/smh-intercept-create-repository.yml | 5 | ||||
-rw-r--r-- | internal/praefect/repository_exists.go | 58 | ||||
-rw-r--r-- | internal/praefect/repository_exists_test.go | 121 | ||||
-rw-r--r-- | internal/praefect/server.go | 38 |
4 files changed, 206 insertions, 16 deletions
diff --git a/changelogs/unreleased/smh-intercept-create-repository.yml b/changelogs/unreleased/smh-intercept-create-repository.yml new file mode 100644 index 000000000..81db72ac7 --- /dev/null +++ b/changelogs/unreleased/smh-intercept-create-repository.yml @@ -0,0 +1,5 @@ +--- +title: Intercept RepositoryExists calls in Praefect +merge_request: 3075 +author: +type: changed diff --git a/internal/praefect/repository_exists.go b/internal/praefect/repository_exists.go new file mode 100644 index 000000000..0ecbf5ed6 --- /dev/null +++ b/internal/praefect/repository_exists.go @@ -0,0 +1,58 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + errMissingRepository = status.Error(codes.InvalidArgument, "missing repository") + errMissingStorageName = status.Error(codes.InvalidArgument, "repository missing storage name") + errMissingRelativePath = status.Error(codes.InvalidArgument, "repository missing relative path") +) + +// RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists +// calls by checking whether there is a record of the repository in the database. +func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RepositoryExists" { + return handler(srv, stream) + } + + var req gitalypb.RepositoryExistsRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + repo := req.GetRepository() + if repo == nil { + return errMissingRepository + } + + storageName := repo.StorageName + if storageName == "" { + return errMissingStorageName + } + + relativePath := repo.RelativePath + if relativePath == "" { + return errMissingRelativePath + } + + exists, err := rs.RepositoryExists(stream.Context(), storageName, relativePath) + if err != nil { + return fmt.Errorf("repository exists: %w", err) + } + + if err := stream.SendMsg(&gitalypb.RepositoryExistsResponse{Exists: exists}); err != nil { + return fmt.Errorf("send response: %w", err) + } + + return nil + } +} diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go new file mode 100644 index 000000000..f0bb53a76 --- /dev/null +++ b/internal/praefect/repository_exists_test.go @@ -0,0 +1,121 @@ +// +build postgres + +package praefect + +import ( + "context" + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestRepositoryExistsStreamInterceptor(t *testing.T) { + errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly") + + for _, tc := range []struct { + desc string + routeToGitaly bool + repository *gitalypb.Repository + response *gitalypb.RepositoryExistsResponse + error error + }{ + { + desc: "missing repository", + error: errMissingRepository, + }, + { + desc: "missing storage name", + repository: &gitalypb.Repository{RelativePath: "relative-path"}, + error: errMissingStorageName, + }, + { + desc: "missing relative path", + repository: &gitalypb.Repository{StorageName: "virtual-storage"}, + error: errMissingRelativePath, + }, + { + desc: "invalid virtual storage", + repository: &gitalypb.Repository{StorageName: "invalid virtual storage", RelativePath: "relative-path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: false}, + }, + { + desc: "invalid relative path", + repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "invalid relative path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: false}, + }, + { + desc: "repository found", + repository: &gitalypb.Repository{StorageName: "virtual-storage", RelativePath: "relative-path"}, + response: &gitalypb.RepositoryExistsResponse{Exists: true}, + }, + { + desc: "routed to gitaly", + routeToGitaly: true, + error: errServedByGitaly, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db := getDB(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage")) + + electionStrategy := config.ElectionStrategyPerRepository + if tc.routeToGitaly { + electionStrategy = config.ElectionStrategySQL + } + + tmp, cleanDir := testhelper.TempDir(t) + defer cleanDir() + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer( + config.Config{ + Failover: config.Failover{ + ElectionStrategy: electionStrategy, + }, + }, + testhelper.DiscardTestEntry(t), + protoregistry.GitalyProtoPreregistered, + func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return nil, errServedByGitaly + }, + nil, + nil, + nil, + rs, + nil, + ) + defer srv.Stop() + + go func() { srv.Serve(ln) }() + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + + client := gitalypb.NewRepositoryServiceClient(clientConn) + _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: tc.repository}) + require.Equal(t, errServedByGitaly, err, "other RPCs should be passed through") + + resp, err := client.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{Repository: tc.repository}) + require.Equal(t, tc.error, err) + require.Equal(t, tc.response, resp) + }) + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 927450a53..dabac9dd3 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -54,24 +54,30 @@ func NewGRPCServer( grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } + streamInterceptors := []grpc.StreamServerInterceptor{ + grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + middleware.MethodTypeStreamInterceptor(registry), + metadatahandler.StreamInterceptor, + grpc_prometheus.StreamServerInterceptor, + grpc_logrus.StreamServerInterceptor(logger), + featureflag.StreamInterceptor, + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + grpctracing.StreamServerTracingInterceptor(), + auth.StreamServerInterceptor(conf.Auth), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + } + + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append(streamInterceptors, RepositoryExistsStreamInterceptor(rs)) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - middleware.MethodTypeStreamInterceptor(registry), - metadatahandler.StreamInterceptor, - grpc_prometheus.StreamServerInterceptor, - grpc_logrus.StreamServerInterceptor(logger), - featureflag.StreamInterceptor, - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - grpctracing.StreamServerTracingInterceptor(), - auth.StreamServerInterceptor(conf.Auth), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler |