diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2022-08-09 10:09:18 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2022-08-09 10:09:18 +0300 |
commit | 9765a9733ed7f433d0241a589b45bbf712b1a0ab (patch) | |
tree | 77a7d0112428fb5e42c84b6e4f79585b33a7145f | |
parent | 6bb5f6969910ce5010f1c894ee671a86e656e6da (diff) | |
parent | 96af8f1c9656f20a08e2b93aa5489cf0b18774ed (diff) |
Merge branch 'ps-praefect-readiness-check' into 'master'
praefect: Check of the service readiness with RPC call
See merge request gitlab-org/gitaly!4674
30 files changed, 1163 insertions, 414 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index f2389aeb7..b9edba3d1 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -88,6 +88,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/reconciler" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/repocleaner" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service/transaction" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/v15/internal/sidechannel" @@ -435,6 +436,7 @@ func run( protoregistry.GitalyProtoPreregistered, nodeSet.Connections(), primaryGetter, + service.AllChecks(), ) ) metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 292da9a30..523c41af5 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -12,10 +12,9 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" - "gitlab.com/gitlab-org/gitaly/v15/internal/helper" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" "google.golang.org/grpc" ) @@ -43,17 +42,10 @@ var subcommands = map[string]subcmd{ removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout), trackRepositoryCmdName: newTrackRepository(logger, os.Stdout), listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout), - checkCmdName: newCheckSubcommand( - os.Stdout, - praefect.NewPraefectMigrationCheck, - praefect.NewGitalyNodeConnectivityCheck, - praefect.NewPostgresReadWriteCheck, - praefect.NewUnavailableReposCheck, - praefect.NewClockSyncCheck(helper.CheckClockSync), - ), - metadataCmdName: newMetadataSubcommand(os.Stdout), - verifyCmdName: newVerifySubcommand(os.Stdout), - listStoragesCmdName: newListStorages(os.Stdout), + checkCmdName: newCheckSubcommand(os.Stdout, service.AllChecks()...), + metadataCmdName: newMetadataSubcommand(os.Stdout), + verifyCmdName: newVerifySubcommand(os.Stdout), + listStoragesCmdName: newListStorages(os.Stdout), } // subCommand returns an exit code, to be fed into os.Exit. diff --git a/cmd/praefect/subcmd_check.go b/cmd/praefect/subcmd_check.go index d50d475d3..7e69140d9 100644 --- a/cmd/praefect/subcmd_check.go +++ b/cmd/praefect/subcmd_check.go @@ -8,8 +8,8 @@ import ( "io" "time" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" ) const ( @@ -19,10 +19,10 @@ const ( type checkSubcommand struct { w io.Writer quiet bool - checkFuncs []praefect.CheckFunc + checkFuncs []service.CheckFunc } -func newCheckSubcommand(writer io.Writer, checkFuncs ...praefect.CheckFunc) *checkSubcommand { +func newCheckSubcommand(writer io.Writer, checkFuncs ...service.CheckFunc) *checkSubcommand { return &checkSubcommand{ w: writer, checkFuncs: checkFuncs, @@ -43,8 +43,8 @@ func (cmd *checkSubcommand) FlagSet() *flag.FlagSet { var errFatalChecksFailed = errors.New("checks failed") -func (cmd *checkSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) error { - var allChecks []*praefect.Check +func (cmd *checkSubcommand) Exec(_ *flag.FlagSet, cfg config.Config) error { + var allChecks []*service.Check for _, checkFunc := range cmd.checkFuncs { allChecks = append(allChecks, checkFunc(cfg, cmd.w, cmd.quiet)) } @@ -60,7 +60,7 @@ func (cmd *checkSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) error { if err := check.Run(ctx); err != nil { failedChecks++ - if check.Severity == praefect.Fatal { + if check.Severity == service.Fatal { passed = false } fmt.Fprintf(cmd.w, "Failed (%s) error: %s\n", check.Severity, err.Error()) @@ -85,7 +85,7 @@ func (cmd *checkSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) error { return nil } -func (cmd *checkSubcommand) printCheckDetails(check *praefect.Check) { +func (cmd *checkSubcommand) printCheckDetails(check *service.Check) { if cmd.quiet { fmt.Fprintf(cmd.w, "Checking %s...", check.Name) return diff --git a/cmd/praefect/subcmd_check_test.go b/cmd/praefect/subcmd_check_test.go index d076510f9..3df258a47 100644 --- a/cmd/praefect/subcmd_check_test.go +++ b/cmd/praefect/subcmd_check_test.go @@ -11,8 +11,8 @@ import ( "testing" "github.com/stretchr/testify/assert" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" ) func TestCheckSubcommand_Exec(t *testing.T) { @@ -20,36 +20,36 @@ func TestCheckSubcommand_Exec(t *testing.T) { testCases := []struct { desc string - checks []praefect.CheckFunc + checks []service.CheckFunc expectedQuietOutput string expectedOutput string expectedError error }{ { desc: "all checks pass", - checks: []praefect.CheckFunc{ - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + checks: []service.CheckFunc{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 1", Description: "checks a", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 2", Description: "checks b", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 3", Description: "checks c", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, }, @@ -72,29 +72,29 @@ All checks passed. }, { desc: "a fatal check fails", - checks: []praefect.CheckFunc{ - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + checks: []service.CheckFunc{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 1", Description: "checks a", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 2", Description: "checks b", Run: func(ctx context.Context) error { return errors.New("i failed") }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 3", Description: "checks c", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, }, @@ -117,29 +117,29 @@ Checking check 3...Passed }, { desc: "only warning checks fail", - checks: []praefect.CheckFunc{ - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + checks: []service.CheckFunc{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 1", Description: "checks a", Run: func(ctx context.Context) error { return nil }, - Severity: praefect.Fatal, + Severity: service.Fatal, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 2", Description: "checks b", Run: func(ctx context.Context) error { return errors.New("i failed but not too badly") }, - Severity: praefect.Warning, + Severity: service.Warning, } }, - func(cfg config.Config, w io.Writer, quiet bool) *praefect.Check { - return &praefect.Check{ + func(cfg config.Config, w io.Writer, quiet bool) *service.Check { + return &service.Check{ Name: "check 3", Description: "checks c", Run: func(ctx context.Context) error { return errors.New("i failed but not too badly") }, - Severity: praefect.Warning, + Severity: service.Warning, } }, }, diff --git a/internal/gitaly/service/server/readiness.go b/internal/gitaly/service/server/readiness.go new file mode 100644 index 000000000..59708e2ed --- /dev/null +++ b/internal/gitaly/service/server/readiness.go @@ -0,0 +1,13 @@ +package server + +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" +) + +// ReadinessCheck is a stub that does nothing but exists to support single interface for gitaly +// and praefect. The praefect service requires this method. +func (s *server) ReadinessCheck(context.Context, *gitalypb.ReadinessCheckRequest) (*gitalypb.ReadinessCheckResponse, error) { + return &gitalypb.ReadinessCheckResponse{Result: &gitalypb.ReadinessCheckResponse_OkResponse{}}, nil +} diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index ba62e0326..9f0cd9012 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -163,7 +163,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry) - srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil) + srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, txMgr, nil, nil, nil, nil, nil, nil) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 19486f40c..37e495ad5 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -568,7 +568,7 @@ func TestStreamDirector_maintenanceRPCs(t *testing.T) { t, testcfg.WithStorages(secondaryStorage)), ) - cc, _, cleanup := runPraefectServer(t, ctx, config.Config{ + cc, _, cleanup := RunPraefectServer(t, ctx, config.Config{ VirtualStorages: []*config.VirtualStorage{ { Name: "default", @@ -584,7 +584,7 @@ func TestStreamDirector_maintenanceRPCs(t *testing.T) { }, }, }, - }, buildOptions{}) + }, BuildOptions{}) defer cleanup() repository := &gitalypb.Repository{ @@ -1569,10 +1569,10 @@ func TestCoordinatorEnqueueFailure(t *testing.T) { require.NoError(t, err) ctx := testhelper.Context(t) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withAnnotations: r, - withQueue: queueInterceptor, - withBackends: withMockBackends(t, map[string]mock.SimpleServiceServer{ + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithAnnotations: r, + WithQueue: queueInterceptor, + WithBackends: WithMockBackends(t, map[string]mock.SimpleServiceServer{ conf.VirtualStorages[0].Nodes[0].Storage: ms, conf.VirtualStorages[0].Nodes[1].Storage: ms, }), @@ -1946,10 +1946,10 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }) } - praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ + praefectConn, _, cleanup := RunPraefectServer(t, ctx, praefectConfig, BuildOptions{ // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. - withNodeMgr: &nodes.MockManager{ + WithNodeMgr: &nodes.MockManager{ Storage: testhelper.DefaultStorageName, GetShardFunc: func(shardName string) (nodes.Shard, error) { require.Equal(t, testhelper.DefaultStorageName, shardName) @@ -1964,7 +1964,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { }, // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent // nodes will take part in transactions. - withRepoStore: datastore.MockRepositoryStore{ + WithRepoStore: datastore.MockRepositoryStore{ GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { return repoProto.GetRelativePath(), nil }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 8b654b97a..a8d98da5c 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -5,30 +5,9 @@ package praefect import ( "context" "fmt" - "net" - "testing" - "time" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v15/client" - gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth" - "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/server/auth" - "gitlab.com/gitlab-org/gitaly/v15/internal/log" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest" - "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" - correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" - "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // generates a praefect configuration with the specified number of backend @@ -56,14 +35,6 @@ func testConfig(backends int) config.Config { return cfg } -type noopBackoffFactory struct{} - -func (noopBackoffFactory) Create() (Backoff, BackoffReset) { - return func() time.Duration { - return 0 - }, func() {} -} - type nullNodeMgr struct{} func (nullNodeMgr) GetShard(ctx context.Context, virtualStorageName string) (nodes.Shard, error) { @@ -81,221 +52,3 @@ func (nullNodeMgr) HealthyNodes() map[string][]string { func (nullNodeMgr) Nodes() map[string][]nodes.Node { return nil } - -type buildOptions struct { - withQueue datastore.ReplicationEventQueue - withTxMgr *transactions.Manager - withBackends func([]*config.VirtualStorage) []testhelper.Cleanup - withAnnotations *protoregistry.Registry - withLogger *logrus.Entry - withNodeMgr nodes.Manager - withRepoStore datastore.RepositoryStore - withAssignmentStore AssignmentStore - withConnections Connections - withPrimaryGetter PrimaryGetter - withRouter Router -} - -func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup { - return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup { - var cleanups []testhelper.Cleanup - - for _, vs := range virtualStorages { - require.Equal(t, len(backends), len(vs.Nodes), - "mock server count doesn't match config nodes") - - for i, node := range vs.Nodes { - backend, ok := backends[node.Storage] - require.True(t, ok, "missing backend server for node %s", node.Storage) - - backendAddr, cleanup := newMockDownstream(t, node.Token, backend) - cleanups = append(cleanups, cleanup) - - node.Address = backendAddr - vs.Nodes[i] = node - } - } - - return cleanups - } -} - -func defaultQueue(t testing.TB) datastore.ReplicationEventQueue { - return datastore.NewPostgresReplicationEventQueue(testdb.New(t)) -} - -func defaultTxMgr(conf config.Config) *transactions.Manager { - return transactions.NewManager(conf) -} - -func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) - require.NoError(t, err) - nodeMgr.Start(0, time.Hour) - t.Cleanup(nodeMgr.Stop) - return nodeMgr -} - -func defaultRepoStore(conf config.Config) datastore.RepositoryStore { - return datastore.MockRepositoryStore{} -} - -func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, opt buildOptions) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { - var cleanups []testhelper.Cleanup - - if opt.withQueue == nil { - opt.withQueue = defaultQueue(t) - } - if opt.withRepoStore == nil { - opt.withRepoStore = defaultRepoStore(conf) - } - if opt.withTxMgr == nil { - opt.withTxMgr = defaultTxMgr(conf) - } - if opt.withBackends != nil { - cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...) - } - if opt.withAnnotations == nil { - opt.withAnnotations = protoregistry.GitalyProtoPreregistered - } - if opt.withLogger == nil { - opt.withLogger = log.Default() - } - if opt.withNodeMgr == nil { - opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withRepoStore) - } - if opt.withAssignmentStore == nil { - opt.withAssignmentStore = NewDisabledAssignmentStore(conf.StorageNames()) - } - if opt.withRouter == nil { - opt.withRouter = NewNodeManagerRouter(opt.withNodeMgr, opt.withRepoStore) - } - - coordinator := NewCoordinator( - opt.withQueue, - opt.withRepoStore, - opt.withRouter, - opt.withTxMgr, - conf, - opt.withAnnotations, - ) - - // TODO: run a replmgr for EVERY virtual storage - replmgr := NewReplMgr( - opt.withLogger, - conf.StorageNames(), - opt.withQueue, - opt.withRepoStore, - opt.withNodeMgr, - NodeSetFromNodeManager(opt.withNodeMgr), - ) - - prf := NewGRPCServer( - conf, - opt.withLogger, - protoregistry.GitalyProtoPreregistered, - coordinator.StreamDirector, - opt.withTxMgr, - opt.withRepoStore, - opt.withAssignmentStore, - opt.withConnections, - opt.withPrimaryGetter, - nil, - ) - - listener, port := listenAvailPort(t) - - errQ := make(chan error) - ctx, cancel := context.WithCancel(ctx) - - go func() { - errQ <- prf.Serve(listener) - close(errQ) - }() - replMgrDone := startProcessBacklog(ctx, replmgr) - - // dial client to praefect - cc := dialLocalPort(t, port, false) - - cleanup := func() { - cc.Close() - - for _, cu := range cleanups { - cu() - } - - prf.Stop() - - cancel() - <-replMgrDone - require.NoError(t, <-errQ) - } - - return cc, prf, cleanup -} - -func listenAvailPort(tb testing.TB) (net.Listener, int) { - listener, err := net.Listen("tcp", "localhost:0") - require.NoError(tb, err) - - return listener, listener.Addr().(*net.TCPAddr).Port -} - -func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { - opts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), - grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), - } - if backend { - opts = append( - opts, - grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), - ) - } - - cc, err := client.Dial( - fmt.Sprintf("tcp://localhost:%d", port), - opts, - ) - require.NoError(tb, err) - - return cc -} - -func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) { - srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(gitalycfgauth.Config{Token: token}))) - mock.RegisterSimpleServiceServer(srv, m) - healthpb.RegisterHealthServer(srv, health.NewServer()) - - // client to backend service - lis, port := listenAvailPort(tb) - - errQ := make(chan error) - - go func() { - errQ <- srv.Serve(lis) - }() - - cleanup := func() { - srv.GracefulStop() - lis.Close() - - // If the server is shutdown before Serve() is called on it - // the Serve() calls will return the ErrServerStopped - if err := <-errQ; err != nil && err != grpc.ErrServerStopped { - require.NoError(tb, err) - } - } - - return fmt.Sprintf("tcp://localhost:%d", port), cleanup -} - -func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} { - done := make(chan struct{}) - go func() { - defer close(done) - replMgr.ProcessBacklog(ctx, noopBackoffFactory{}) - }() - return done -} diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index 6e860a3c6..3c9d6c211 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -100,10 +100,10 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: conns, - withRepoStore: rs, - withRouter: NewPerRepositoryRouter( + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithConnections: conns, + WithRepoStore: rs, + WithRouter: NewPerRepositoryRouter( conns, elector, StaticHealthChecker{virtualStorage: storages}, @@ -113,8 +113,8 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { rs, conf.DefaultReplicationFactors(), ), - withPrimaryGetter: elector, - withTxMgr: txManager, + WithPrimaryGetter: elector, + WithTxMgr: txManager, }) // use cleanup to close the connections as gittest.CreateRepository will still use the connection // for clean up after the test. diff --git a/internal/praefect/remove_repository_test.go b/internal/praefect/remove_repository_test.go index 4a1b24251..f9eaf057a 100644 --- a/internal/praefect/remove_repository_test.go +++ b/internal/praefect/remove_repository_test.go @@ -120,6 +120,7 @@ func TestRemoveRepositoryHandler(t *testing.T) { nodeSet.Connections(), nil, nil, + nil, ) defer srv.Stop() diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 87f38d85a..3eb445e7e 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -99,6 +99,7 @@ func TestRepositoryExistsHandler(t *testing.T) { nil, nil, nil, + nil, ) defer srv.Stop() diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 3192ddfdd..202eb2e6d 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -92,6 +92,7 @@ func NewGRPCServer( conns Connections, primaryGetter PrimaryGetter, creds credentials.TransportCredentials, + checks []service.CheckFunc, grpcOpts ...grpc.ServerOption, ) *grpc.Server { streamInterceptors := []grpc.StreamServerInterceptor{ @@ -153,7 +154,7 @@ func NewGRPCServer( warnDupeAddrs(logger, conf) srv := grpc.NewServer(grpcOpts...) - registerServices(srv, txMgr, conf, rs, assignmentStore, service.Connections(conns), primaryGetter) + registerServices(srv, txMgr, conf, rs, assignmentStore, service.Connections(conns), primaryGetter, checks) if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ @@ -184,9 +185,10 @@ func registerServices( assignmentStore AssignmentStore, conns service.Connections, primaryGetter info.PrimaryGetter, + checks []service.CheckFunc, ) { // ServerServiceServer is necessary for the ServerInfo RPC - gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, conns)) + gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, conns, checks)) gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(conf, rs, assignmentStore, conns, primaryGetter)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go index da7fe2c7a..45424fe9d 100644 --- a/internal/praefect/server_factory.go +++ b/internal/praefect/server_factory.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -30,6 +31,7 @@ func NewServerFactory( registry *protoregistry.Registry, conns Connections, primaryGetter PrimaryGetter, + checks []service.CheckFunc, ) *ServerFactory { return &ServerFactory{ conf: conf, @@ -43,6 +45,7 @@ func NewServerFactory( registry: registry, conns: conns, primaryGetter: primaryGetter, + checks: checks, } } @@ -61,6 +64,7 @@ type ServerFactory struct { secure, insecure []*grpc.Server conns Connections primaryGetter PrimaryGetter + checks []service.CheckFunc } // Serve starts serving on the provided listener with newly created grpc.Server @@ -131,6 +135,7 @@ func (s *ServerFactory) createGRPC(creds credentials.TransportCredentials) *grpc s.conns, s.primaryGetter, creds, + s.checks, ) } diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index f9bbdd16a..7fd850e21 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -179,7 +179,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -212,7 +212,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil, nil) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, "localhost:0") @@ -254,7 +254,7 @@ func TestServerFactory(t *testing.T) { t.Run("stops all listening servers", func(t *testing.T) { ctx := testhelper.Context(t) - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil, nil) defer praefectServerFactory.Stop() // start with tcp address @@ -322,7 +322,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -331,7 +331,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil, nil) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index efecddbab..edbed63e0 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -159,8 +159,8 @@ func TestGitalyServerInfo(t *testing.T) { require.NoError(t, err) t.Cleanup(nodeSet.Close) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: nodeSet.Connections(), + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithConnections: nodeSet.Connections(), }) t.Cleanup(cleanup) @@ -225,8 +225,8 @@ func TestGitalyServerInfo(t *testing.T) { require.NoError(t, err) t.Cleanup(nodeSet.Close) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: nodeSet.Connections(), + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithConnections: nodeSet.Connections(), }) t.Cleanup(cleanup) @@ -262,8 +262,8 @@ func TestGitalyServerInfoBadNode(t *testing.T) { require.NoError(t, err) defer nodes.Close() - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: nodes.Connections(), + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithConnections: nodes.Connections(), }) defer cleanup() @@ -294,8 +294,8 @@ func TestDiskStatistics(t *testing.T) { require.NoError(t, err) defer nodes.Close() - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ - withConnections: nodes.Connections(), + cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{ + WithConnections: nodes.Connections(), }) defer cleanup() @@ -314,12 +314,12 @@ func TestHealthCheck(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - cc, _, cleanup := runPraefectServer(t, ctx, config.Config{VirtualStorages: []*config.VirtualStorage{ + cc, _, cleanup := RunPraefectServer(t, ctx, config.Config{VirtualStorages: []*config.VirtualStorage{ { Name: "praefect", Nodes: []*config.Node{{Storage: "stub", Address: "unix:///stub-address", Token: ""}}, }, - }}, buildOptions{}) + }}, BuildOptions{}) defer cleanup() client := grpc_health_v1.NewHealthClient(cc) @@ -344,7 +344,7 @@ func TestRejectBadStorage(t *testing.T) { } ctx := testhelper.Context(t) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{}) + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{}) defer cleanup() req := &gitalypb.GarbageCollectRequest{ @@ -396,9 +396,9 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook := test.NewNullLogger() // instantiate a praefect server and trigger warning - _, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withLogger: logrus.NewEntry(tLogger), - withNodeMgr: nullNodeMgr{}, // to suppress node address issues + _, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithLogger: logrus.NewEntry(tLogger), + WithNodeMgr: nullNodeMgr{}, // to suppress node address issues }) defer cleanup() @@ -427,9 +427,9 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() // instantiate a praefect server and trigger warning - _, _, cleanup = runPraefectServer(t, ctx, conf, buildOptions{ - withLogger: logrus.NewEntry(tLogger), - withNodeMgr: nullNodeMgr{}, // to suppress node address issues + _, _, cleanup = RunPraefectServer(t, ctx, conf, BuildOptions{ + WithLogger: logrus.NewEntry(tLogger), + WithNodeMgr: nullNodeMgr{}, // to suppress node address issues }) defer cleanup() @@ -476,9 +476,9 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() // instantiate a praefect server and trigger warning - _, _, cleanup = runPraefectServer(t, ctx, conf, buildOptions{ - withLogger: logrus.NewEntry(tLogger), - withNodeMgr: nullNodeMgr{}, // to suppress node address issues + _, _, cleanup = RunPraefectServer(t, ctx, conf, BuildOptions{ + WithLogger: logrus.NewEntry(tLogger), + WithNodeMgr: nullNodeMgr{}, // to suppress node address issues }) defer cleanup() @@ -535,15 +535,15 @@ func TestRemoveRepository(t *testing.T) { nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ - withQueue: queueInterceptor, - withRepoStore: datastore.MockRepositoryStore{ + cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{ + WithQueue: queueInterceptor, + WithRepoStore: datastore.MockRepositoryStore{ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { return relativePath, nil, nil }, }, - withNodeMgr: nodeMgr, - withTxMgr: txMgr, + WithNodeMgr: nodeMgr, + WithTxMgr: txMgr, }) defer cleanup() @@ -612,10 +612,10 @@ func testRenameRepository(t *testing.T, ctx context.Context) { require.NoError(t, err) defer nodeSet.Close() - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ - withQueue: evq, - withRepoStore: rs, - withRouter: NewPerRepositoryRouter( + cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{ + WithQueue: evq, + WithRepoStore: rs, + WithRouter: NewPerRepositoryRouter( nodeSet.Connections(), nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), @@ -625,7 +625,7 @@ func testRenameRepository(t *testing.T, ctx context.Context) { rs, nil, ), - withTxMgr: txManager, + WithTxMgr: txManager, }) t.Cleanup(cleanup) diff --git a/internal/praefect/checks.go b/internal/praefect/service/checks.go index 32d026d45..06f82c01d 100644 --- a/internal/praefect/checks.go +++ b/internal/praefect/service/checks.go @@ -1,4 +1,4 @@ -package praefect +package service import ( "context" @@ -12,6 +12,7 @@ import ( migrate "github.com/rubenv/sql-migrate" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" "gitlab.com/gitlab-org/gitaly/v15/client" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" @@ -48,6 +49,17 @@ type Check struct { // CheckFunc is a function type that takes a praefect config and returns a Check type CheckFunc func(conf config.Config, w io.Writer, quiet bool) *Check +// AllChecks returns slice of all checks that can be executed for praefect. +func AllChecks() []CheckFunc { + return []CheckFunc{ + NewPraefectMigrationCheck, + NewGitalyNodeConnectivityCheck, + NewPostgresReadWriteCheck, + NewUnavailableReposCheck, + NewClockSyncCheck(helper.CheckClockSync), + } +} + // NewPraefectMigrationCheck returns a Check that checks if all praefect migrations have run func NewPraefectMigrationCheck(conf config.Config, w io.Writer, quiet bool) *Check { return &Check{ diff --git a/internal/praefect/checks_test.go b/internal/praefect/service/checks_test.go index b2e4e96e9..851d21692 100644 --- a/internal/praefect/checks_test.go +++ b/internal/praefect/service/checks_test.go @@ -1,6 +1,6 @@ //go:build !gitaly_test_sha256 -package praefect +package service import ( "bytes" diff --git a/internal/praefect/service/helper_test.go b/internal/praefect/service/helper_test.go new file mode 100644 index 000000000..4bf810464 --- /dev/null +++ b/internal/praefect/service/helper_test.go @@ -0,0 +1,11 @@ +package service + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/praefect/service/server/readiness.go b/internal/praefect/service/server/readiness.go new file mode 100644 index 000000000..39f1392e8 --- /dev/null +++ b/internal/praefect/service/server/readiness.go @@ -0,0 +1,53 @@ +package server + +import ( + "context" + "io" + "sort" + + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" +) + +// ReadinessCheck runs the set of the checks to make sure service is in operational state. +func (s *Server) ReadinessCheck(ctx context.Context, req *gitalypb.ReadinessCheckRequest) (*gitalypb.ReadinessCheckResponse, error) { + checkCtx := ctx + checkCancel := func() {} + timeout := req.GetTimeout().AsDuration() + if req.GetTimeout().IsValid() && timeout > 0 { + checkCtx, checkCancel = context.WithTimeout(ctx, timeout) + } + defer checkCancel() + + results := make(chan *gitalypb.ReadinessCheckResponse_Failure_Response, len(s.checks)) + for _, newCheck := range s.checks { + check := newCheck(s.conf, io.Discard, true) + go func() { + if err := check.Run(checkCtx); err != nil { + results <- &gitalypb.ReadinessCheckResponse_Failure_Response{ + Name: check.Name, + ErrorMessage: err.Error(), + } + } else { + results <- nil + } + }() + } + + var failedChecks []*gitalypb.ReadinessCheckResponse_Failure_Response + for i := 0; i < cap(results); i++ { + if result := <-results; result != nil { + failedChecks = append(failedChecks, result) + } + } + + if len(failedChecks) > 0 { + sort.Slice(failedChecks, func(i, j int) bool { return failedChecks[i].Name < failedChecks[j].Name }) + return &gitalypb.ReadinessCheckResponse{Result: &gitalypb.ReadinessCheckResponse_FailureResponse{ + FailureResponse: &gitalypb.ReadinessCheckResponse_Failure{ + FailedChecks: failedChecks, + }, + }}, nil + } + + return &gitalypb.ReadinessCheckResponse{Result: &gitalypb.ReadinessCheckResponse_OkResponse{}}, nil +} diff --git a/internal/praefect/service/server/readiness_test.go b/internal/praefect/service/server/readiness_test.go new file mode 100644 index 000000000..d53deb413 --- /dev/null +++ b/internal/praefect/service/server/readiness_test.go @@ -0,0 +1,114 @@ +package server_test + +import ( + "context" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestServer_ReadinessCheck(t *testing.T) { + t.Parallel() + stubCheck := func(t *testing.T, triggered chan string, name string) *service.Check { + return &service.Check{ + Name: name, + Run: func(ctx context.Context) error { + _, ok := ctx.Deadline() + assert.True(t, ok, "the deadline should be set as we provide timeout") + triggered <- name + return nil + }, + } + } + + const gitalyStorageName = "praefect-internal-0" + gitalyCfg := testcfg.Build(t, testcfg.WithStorages(gitalyStorageName)) + gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + praefectConf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: gitalyStorageName, + Address: gitalyAddr, + }, + }, + }, + }, + } + ctx := testhelper.Context(t) + triggered := make(chan string, 2) + grpcPraefectConn, _, cleanup := praefect.RunPraefectServer(t, ctx, praefectConf, praefect.BuildOptions{ + WithChecks: []service.CheckFunc{ + func(conf config.Config, w io.Writer, quiet bool) *service.Check { + return stubCheck(t, triggered, "1") + }, + func(conf config.Config, w io.Writer, quiet bool) *service.Check { + return stubCheck(t, triggered, "2") + }, + }, + }) + t.Cleanup(cleanup) + serverClient := gitalypb.NewServerServiceClient(grpcPraefectConn) + resp, err := serverClient.ReadinessCheck(ctx, &gitalypb.ReadinessCheckRequest{Timeout: durationpb.New(time.Second)}) + require.NoError(t, err) + assert.NotNil(t, resp.GetOkResponse()) + if !assert.Nil(t, resp.GetFailureResponse()) { + for _, failure := range resp.GetFailureResponse().GetFailedChecks() { + assert.Failf(t, "failed check", "%s: %s", failure.Name, failure.ErrorMessage) + } + } + names := make([]string, 0, cap(triggered)) + for i := 0; i < cap(triggered); i++ { + name := <-triggered + names = append(names, name) + } + require.ElementsMatch(t, []string{"1", "2"}, names, "both tasks should be triggered for an execution") +} + +func TestServer_ReadinessCheck_unreachableGitaly(t *testing.T) { + t.Parallel() + praefectConf := config.Config{ + SocketPath: testhelper.GetTemporaryGitalySocketFileName(t), + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: "praefect-internal-0", + Address: "tcp://non-existing:42", + }, + }, + }, + }, + } + ctx := testhelper.Context(t) + grpcConn, _, cleanup := praefect.RunPraefectServer(t, ctx, praefectConf, praefect.BuildOptions{}) + t.Cleanup(cleanup) + serverClient := gitalypb.NewServerServiceClient(grpcConn) + resp, err := serverClient.ReadinessCheck(ctx, &gitalypb.ReadinessCheckRequest{Timeout: durationpb.New(time.Nanosecond)}) + require.NoError(t, err) + require.Nil(t, resp.GetOkResponse()) + require.NotNil(t, resp.GetFailureResponse()) + require.Len(t, resp.GetFailureResponse().FailedChecks, 5) + require.Equal(t, "clock synchronization", resp.GetFailureResponse().FailedChecks[0].Name) + require.Equal(t, "database read/write", resp.GetFailureResponse().FailedChecks[1].Name) + require.Equal(t, "gitaly node connectivity & disk access", resp.GetFailureResponse().FailedChecks[2].Name) + require.Equal(t, "praefect migrations", resp.GetFailureResponse().FailedChecks[3].Name) + require.Equal(t, "unavailable repositories", resp.GetFailureResponse().FailedChecks[4].Name) +} diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go index e15dc5bea..f49621576 100644 --- a/internal/praefect/service/server/server.go +++ b/internal/praefect/service/server/server.go @@ -9,15 +9,17 @@ import ( // Server is a ServerService server type Server struct { gitalypb.UnimplementedServerServiceServer - conf config.Config - conns service.Connections + conf config.Config + conns service.Connections + checks []service.CheckFunc } // NewServer creates a new instance of a grpc ServerServiceServer -func NewServer(conf config.Config, conns service.Connections) gitalypb.ServerServiceServer { +func NewServer(conf config.Config, conns service.Connections, checks []service.CheckFunc) gitalypb.ServerServiceServer { s := &Server{ - conf: conf, - conns: conns, + conf: conf, + conns: conns, + checks: checks, } return s diff --git a/internal/praefect/service/server/testhelper_test.go b/internal/praefect/service/server/testhelper_test.go new file mode 100644 index 000000000..39eb55aaf --- /dev/null +++ b/internal/praefect/service/server/testhelper_test.go @@ -0,0 +1,11 @@ +package server_test + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go new file mode 100644 index 000000000..142b46764 --- /dev/null +++ b/internal/praefect/testserver.go @@ -0,0 +1,287 @@ +package praefect + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/client" + gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/server/auth" + "gitlab.com/gitlab-org/gitaly/v15/internal/log" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/service" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" + correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// BuildOptions is a set of configurations options that can be set to configure praefect service. +type BuildOptions struct { + // WithQueue sets an implementation of the replication queue to use by praefect service. + WithQueue datastore.ReplicationEventQueue + // WithTxMgr sets the transaction manager to use by praefect service. + WithTxMgr *transactions.Manager + // WithBackends sets a callback that is triggered during initialization. + WithBackends func([]*config.VirtualStorage) []testhelper.Cleanup + // WithAnnotations sets a proto-registry to use by praefect service. + WithAnnotations *protoregistry.Registry + // WithLogger sets a logger to use by praefect service. + WithLogger *logrus.Entry + // WithNodeMgr sets an implementation of the node manager to use by praefect service. + WithNodeMgr nodes.Manager + // WithRepoStore sets an implementation of the repositories store to use by praefect service. + WithRepoStore datastore.RepositoryStore + // WithAssignmentStore sets an implementation of the repositories store to use by praefect service. + WithAssignmentStore AssignmentStore + // WithConnections sets a set of connections to gitalies. + WithConnections Connections + // WithPrimaryGetter sets an implementation of the primary node getter to use by praefect service. + WithPrimaryGetter PrimaryGetter + // WithRouter sets an implementation of the request router to use by praefect service. + WithRouter Router + // WithChecks sets a list of check to run when ReadinessCheck RPC is called. + WithChecks []service.CheckFunc +} + +// WithMockBackends mocks backends with a set of passed in stubs. +func WithMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup { + return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup { + var cleanups []testhelper.Cleanup + + for _, vs := range virtualStorages { + require.Equal(t, len(backends), len(vs.Nodes), + "mock server count doesn't match config nodes") + + for i, node := range vs.Nodes { + backend, ok := backends[node.Storage] + require.True(t, ok, "missing backend server for node %s", node.Storage) + + backendAddr, cleanup := newMockDownstream(t, node.Token, backend) + cleanups = append(cleanups, cleanup) + + node.Address = backendAddr + vs.Nodes[i] = node + } + } + + return cleanups + } +} + +func defaultQueue(t testing.TB) datastore.ReplicationEventQueue { + return datastore.NewPostgresReplicationEventQueue(testdb.New(t)) +} + +func defaultTxMgr(conf config.Config) *transactions.Manager { + return transactions.NewManager(conf) +} + +func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { + nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + t.Cleanup(nodeMgr.Stop) + return nodeMgr +} + +func defaultRepoStore(conf config.Config) datastore.RepositoryStore { + return datastore.MockRepositoryStore{} +} + +func listenAvailPort(tb testing.TB) (net.Listener, int) { + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(tb, err) + + return listener, listener.Addr().(*net.TCPAddr).Port +} + +func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { + opts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), + grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), + } + if backend { + opts = append( + opts, + grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), + ) + } + + cc, err := client.Dial( + fmt.Sprintf("tcp://localhost:%d", port), + opts, + ) + require.NoError(tb, err) + + return cc +} + +func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) { + srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(gitalycfgauth.Config{Token: token}))) + mock.RegisterSimpleServiceServer(srv, m) + healthpb.RegisterHealthServer(srv, health.NewServer()) + + // client to backend service + lis, port := listenAvailPort(tb) + + errQ := make(chan error) + + go func() { + errQ <- srv.Serve(lis) + }() + + cleanup := func() { + srv.GracefulStop() + lis.Close() + + // If the server is shutdown before Serve() is called on it + // the Serve() calls will return the ErrServerStopped + if err := <-errQ; err != nil && err != grpc.ErrServerStopped { + require.NoError(tb, err) + } + } + + return fmt.Sprintf("tcp://localhost:%d", port), cleanup +} + +type noopBackoffFactory struct{} + +func (noopBackoffFactory) Create() (Backoff, BackoffReset) { + return func() time.Duration { + return 0 + }, func() {} +} + +func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + replMgr.ProcessBacklog(ctx, noopBackoffFactory{}) + }() + return done +} + +// RunPraefectServer starts praefect service based on the passed in configuration and options. +// The caller is responsible to call returned testhelper.Cleanup in order to stop the service +// and release all acquired resources. +// The function should be used only for testing purposes and not as part of the production code. +// +//nolint:revive +func RunPraefectServer( + t testing.TB, + ctx context.Context, + conf config.Config, + opt BuildOptions, +) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { + var cleanups []testhelper.Cleanup + + if opt.WithQueue == nil { + opt.WithQueue = defaultQueue(t) + } + if opt.WithRepoStore == nil { + opt.WithRepoStore = defaultRepoStore(conf) + } + if opt.WithTxMgr == nil { + opt.WithTxMgr = defaultTxMgr(conf) + } + if opt.WithBackends != nil { + cleanups = append(cleanups, opt.WithBackends(conf.VirtualStorages)...) + } + if opt.WithAnnotations == nil { + opt.WithAnnotations = protoregistry.GitalyProtoPreregistered + } + if opt.WithLogger == nil { + opt.WithLogger = log.Default() + } + if opt.WithNodeMgr == nil { + opt.WithNodeMgr = defaultNodeMgr(t, conf, opt.WithRepoStore) + } + if opt.WithAssignmentStore == nil { + opt.WithAssignmentStore = NewDisabledAssignmentStore(conf.StorageNames()) + } + if opt.WithRouter == nil { + opt.WithRouter = NewNodeManagerRouter(opt.WithNodeMgr, opt.WithRepoStore) + } + if opt.WithChecks == nil { + opt.WithChecks = service.AllChecks() + } + + coordinator := NewCoordinator( + opt.WithQueue, + opt.WithRepoStore, + opt.WithRouter, + opt.WithTxMgr, + conf, + opt.WithAnnotations, + ) + + // TODO: run a replmgr for EVERY virtual storage + replmgr := NewReplMgr( + opt.WithLogger, + conf.StorageNames(), + opt.WithQueue, + opt.WithRepoStore, + opt.WithNodeMgr, + NodeSetFromNodeManager(opt.WithNodeMgr), + ) + + prf := NewGRPCServer( + conf, + opt.WithLogger, + protoregistry.GitalyProtoPreregistered, + coordinator.StreamDirector, + opt.WithTxMgr, + opt.WithRepoStore, + opt.WithAssignmentStore, + opt.WithConnections, + opt.WithPrimaryGetter, + nil, + opt.WithChecks, + ) + + listener, port := listenAvailPort(t) + + errQ := make(chan error) + ctx, cancel := context.WithCancel(ctx) + + go func() { + errQ <- prf.Serve(listener) + close(errQ) + }() + replMgrDone := startProcessBacklog(ctx, replmgr) + + // dial client to praefect + cc := dialLocalPort(t, port, false) + + cleanup := func() { + cc.Close() + + for _, cu := range cleanups { + cu() + } + + prf.Stop() + + cancel() + <-replMgrDone + require.NoError(t, <-errQ) + } + + return cc, prf, cleanup +} diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index c76959c76..d709ac83d 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -32,9 +32,9 @@ type voter struct { func runPraefectServerAndTxMgr(t testing.TB, ctx context.Context) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) txMgr := transactions.NewManager(conf) - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withTxMgr: txMgr, - withNodeMgr: nullNodeMgr{}, // to suppress node address issues + cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithTxMgr: txMgr, + WithNodeMgr: nullNodeMgr{}, // to suppress node address issues }) return cc, txMgr, cleanup } diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index ccf470116..f152779d9 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -528,8 +528,8 @@ func TestVerifier(t *testing.T) { conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - conn, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withRouter: NewPerRepositoryRouter( + conn, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{ + WithRouter: NewPerRepositoryRouter( conns, elector, StaticHealthChecker(conf.StorageNames()), @@ -539,8 +539,8 @@ func TestVerifier(t *testing.T) { rs, conf.DefaultReplicationFactors(), ), - withRepoStore: rs, - withTxMgr: txManager, + WithRepoStore: rs, + WithTxMgr: txManager, }) t.Cleanup(cleanup) diff --git a/proto/go/gitalypb/server.pb.go b/proto/go/gitalypb/server.pb.go index d085fd754..bfa9e48dc 100644 --- a/proto/go/gitalypb/server.pb.go +++ b/proto/go/gitalypb/server.pb.go @@ -9,6 +9,7 @@ package gitalypb import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" reflect "reflect" sync "sync" ) @@ -322,6 +323,139 @@ func (x *ClockSyncedResponse) GetSynced() bool { return false } +// ReadinessCheckRequest is used to verify if the service is in operational state. +type ReadinessCheckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Timeout is an amount of milliseconds for the check to run before give up and mark as failed. + Timeout *durationpb.Duration `protobuf:"bytes,1,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *ReadinessCheckRequest) Reset() { + *x = ReadinessCheckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadinessCheckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadinessCheckRequest) ProtoMessage() {} + +func (x *ReadinessCheckRequest) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadinessCheckRequest.ProtoReflect.Descriptor instead. +func (*ReadinessCheckRequest) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{6} +} + +func (x *ReadinessCheckRequest) GetTimeout() *durationpb.Duration { + if x != nil { + return x.Timeout + } + return nil +} + +// ReadinessCheckResponse is just a stub now and contains no information. +// If the service is not in the operational state the error will be returned instead. +type ReadinessCheckResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Result: + // *ReadinessCheckResponse_OkResponse + // *ReadinessCheckResponse_FailureResponse + Result isReadinessCheckResponse_Result `protobuf_oneof:"Result"` +} + +func (x *ReadinessCheckResponse) Reset() { + *x = ReadinessCheckResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadinessCheckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadinessCheckResponse) ProtoMessage() {} + +func (x *ReadinessCheckResponse) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadinessCheckResponse.ProtoReflect.Descriptor instead. +func (*ReadinessCheckResponse) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{7} +} + +func (m *ReadinessCheckResponse) GetResult() isReadinessCheckResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (x *ReadinessCheckResponse) GetOkResponse() *ReadinessCheckResponse_Ok { + if x, ok := x.GetResult().(*ReadinessCheckResponse_OkResponse); ok { + return x.OkResponse + } + return nil +} + +func (x *ReadinessCheckResponse) GetFailureResponse() *ReadinessCheckResponse_Failure { + if x, ok := x.GetResult().(*ReadinessCheckResponse_FailureResponse); ok { + return x.FailureResponse + } + return nil +} + +type isReadinessCheckResponse_Result interface { + isReadinessCheckResponse_Result() +} + +type ReadinessCheckResponse_OkResponse struct { + // OkResponse is set when all checks pass. + OkResponse *ReadinessCheckResponse_Ok `protobuf:"bytes,1,opt,name=ok_response,json=okResponse,proto3,oneof"` +} + +type ReadinessCheckResponse_FailureResponse struct { + // FailureResponse is set if at least one check failed. + FailureResponse *ReadinessCheckResponse_Failure `protobuf:"bytes,2,opt,name=failure_response,json=failureResponse,proto3,oneof"` +} + +func (*ReadinessCheckResponse_OkResponse) isReadinessCheckResponse_Result() {} + +func (*ReadinessCheckResponse_FailureResponse) isReadinessCheckResponse_Result() {} + // This comment is left unintentionally blank. type ServerInfoResponse_StorageStatus struct { state protoimpl.MessageState @@ -345,7 +479,7 @@ type ServerInfoResponse_StorageStatus struct { func (x *ServerInfoResponse_StorageStatus) Reset() { *x = ServerInfoResponse_StorageStatus{} if protoimpl.UnsafeEnabled { - mi := &file_server_proto_msgTypes[6] + mi := &file_server_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -358,7 +492,7 @@ func (x *ServerInfoResponse_StorageStatus) String() string { func (*ServerInfoResponse_StorageStatus) ProtoMessage() {} func (x *ServerInfoResponse_StorageStatus) ProtoReflect() protoreflect.Message { - mi := &file_server_proto_msgTypes[6] + mi := &file_server_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -434,7 +568,7 @@ type DiskStatisticsResponse_StorageStatus struct { func (x *DiskStatisticsResponse_StorageStatus) Reset() { *x = DiskStatisticsResponse_StorageStatus{} if protoimpl.UnsafeEnabled { - mi := &file_server_proto_msgTypes[7] + mi := &file_server_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -447,7 +581,7 @@ func (x *DiskStatisticsResponse_StorageStatus) String() string { func (*DiskStatisticsResponse_StorageStatus) ProtoMessage() {} func (x *DiskStatisticsResponse_StorageStatus) ProtoReflect() protoreflect.Message { - mi := &file_server_proto_msgTypes[7] + mi := &file_server_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -484,11 +618,159 @@ func (x *DiskStatisticsResponse_StorageStatus) GetUsed() int64 { return 0 } +// Ok represents response if none checks failed. +type ReadinessCheckResponse_Ok struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ReadinessCheckResponse_Ok) Reset() { + *x = ReadinessCheckResponse_Ok{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadinessCheckResponse_Ok) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadinessCheckResponse_Ok) ProtoMessage() {} + +func (x *ReadinessCheckResponse_Ok) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadinessCheckResponse_Ok.ProtoReflect.Descriptor instead. +func (*ReadinessCheckResponse_Ok) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{7, 0} +} + +// Failure represents response if at least one check failed. +type ReadinessCheckResponse_Failure struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // FailedChecks is a list of failed checks. + FailedChecks []*ReadinessCheckResponse_Failure_Response `protobuf:"bytes,1,rep,name=failed_checks,json=failedChecks,proto3" json:"failed_checks,omitempty"` +} + +func (x *ReadinessCheckResponse_Failure) Reset() { + *x = ReadinessCheckResponse_Failure{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadinessCheckResponse_Failure) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadinessCheckResponse_Failure) ProtoMessage() {} + +func (x *ReadinessCheckResponse_Failure) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadinessCheckResponse_Failure.ProtoReflect.Descriptor instead. +func (*ReadinessCheckResponse_Failure) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{7, 1} +} + +func (x *ReadinessCheckResponse_Failure) GetFailedChecks() []*ReadinessCheckResponse_Failure_Response { + if x != nil { + return x.FailedChecks + } + return nil +} + +// Response contains information about failed check. +type ReadinessCheckResponse_Failure_Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Name is a name of the check that was performed. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // ErrorMessage is a cause of the check failure. + ErrorMessage string `protobuf:"bytes,2,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` +} + +func (x *ReadinessCheckResponse_Failure_Response) Reset() { + *x = ReadinessCheckResponse_Failure_Response{} + if protoimpl.UnsafeEnabled { + mi := &file_server_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadinessCheckResponse_Failure_Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadinessCheckResponse_Failure_Response) ProtoMessage() {} + +func (x *ReadinessCheckResponse_Failure_Response) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadinessCheckResponse_Failure_Response.ProtoReflect.Descriptor instead. +func (*ReadinessCheckResponse_Failure_Response) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{7, 1, 0} +} + +func (x *ReadinessCheckResponse_Failure_Response) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ReadinessCheckResponse_Failure_Response) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + var File_server_proto protoreflect.FileDescriptor var file_server_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x8d, 0x03, 0x0a, 0x12, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, @@ -540,26 +822,58 @@ var file_server_proto_rawDesc = []byte{ 0x73, 0x22, 0x2d, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x79, 0x6e, 0x63, 0x65, 0x64, - 0x32, 0xf3, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x43, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, + 0x22, 0x4c, 0x0a, 0x15, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0xea, + 0x02, 0x0a, 0x16, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x0b, 0x6f, 0x6b, 0x5f, + 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, + 0x73, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4f, + 0x6b, 0x48, 0x00, 0x52, 0x0a, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x53, 0x0a, 0x10, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, + 0x65, 0x48, 0x00, 0x52, 0x0f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x04, 0x0a, 0x02, 0x4f, 0x6b, 0x1a, 0xa4, 0x01, 0x0a, 0x07, 0x46, + 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x12, 0x54, 0x0a, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, + 0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2f, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x46, 0x61, + 0x69, 0x6c, 0x75, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x0c, + 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x1a, 0x43, 0x0a, 0x08, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x42, 0x08, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0xc4, 0x02, 0x0a, 0x0d, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, + 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0e, 0x44, 0x69, 0x73, 0x6b, 0x53, - 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x12, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x44, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x44, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x43, 0x6c, 0x6f, 0x63, - 0x6b, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x6f, - 0x63, 0x6b, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x1a, 0x04, 0xf0, 0x97, 0x28, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0e, 0x44, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, + 0x74, 0x69, 0x63, 0x73, 0x12, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x44, 0x69, + 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x44, 0x69, 0x73, + 0x6b, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x53, 0x79, 0x6e, 0x63, + 0x65, 0x64, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x6f, 0x63, + 0x6b, 0x53, 0x79, 0x6e, 0x63, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x6f, 0x63, 0x6b, 0x53, 0x79, 0x6e, + 0x63, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0e, 0x52, + 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1d, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x04, 0xf0, 0x97, + 0x28, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -574,31 +888,43 @@ func file_server_proto_rawDescGZIP() []byte { return file_server_proto_rawDescData } -var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_server_proto_goTypes = []interface{}{ - (*ServerInfoRequest)(nil), // 0: gitaly.ServerInfoRequest - (*ServerInfoResponse)(nil), // 1: gitaly.ServerInfoResponse - (*DiskStatisticsRequest)(nil), // 2: gitaly.DiskStatisticsRequest - (*DiskStatisticsResponse)(nil), // 3: gitaly.DiskStatisticsResponse - (*ClockSyncedRequest)(nil), // 4: gitaly.ClockSyncedRequest - (*ClockSyncedResponse)(nil), // 5: gitaly.ClockSyncedResponse - (*ServerInfoResponse_StorageStatus)(nil), // 6: gitaly.ServerInfoResponse.StorageStatus - (*DiskStatisticsResponse_StorageStatus)(nil), // 7: gitaly.DiskStatisticsResponse.StorageStatus + (*ServerInfoRequest)(nil), // 0: gitaly.ServerInfoRequest + (*ServerInfoResponse)(nil), // 1: gitaly.ServerInfoResponse + (*DiskStatisticsRequest)(nil), // 2: gitaly.DiskStatisticsRequest + (*DiskStatisticsResponse)(nil), // 3: gitaly.DiskStatisticsResponse + (*ClockSyncedRequest)(nil), // 4: gitaly.ClockSyncedRequest + (*ClockSyncedResponse)(nil), // 5: gitaly.ClockSyncedResponse + (*ReadinessCheckRequest)(nil), // 6: gitaly.ReadinessCheckRequest + (*ReadinessCheckResponse)(nil), // 7: gitaly.ReadinessCheckResponse + (*ServerInfoResponse_StorageStatus)(nil), // 8: gitaly.ServerInfoResponse.StorageStatus + (*DiskStatisticsResponse_StorageStatus)(nil), // 9: gitaly.DiskStatisticsResponse.StorageStatus + (*ReadinessCheckResponse_Ok)(nil), // 10: gitaly.ReadinessCheckResponse.Ok + (*ReadinessCheckResponse_Failure)(nil), // 11: gitaly.ReadinessCheckResponse.Failure + (*ReadinessCheckResponse_Failure_Response)(nil), // 12: gitaly.ReadinessCheckResponse.Failure.Response + (*durationpb.Duration)(nil), // 13: google.protobuf.Duration } var file_server_proto_depIdxs = []int32{ - 6, // 0: gitaly.ServerInfoResponse.storage_statuses:type_name -> gitaly.ServerInfoResponse.StorageStatus - 7, // 1: gitaly.DiskStatisticsResponse.storage_statuses:type_name -> gitaly.DiskStatisticsResponse.StorageStatus - 0, // 2: gitaly.ServerService.ServerInfo:input_type -> gitaly.ServerInfoRequest - 2, // 3: gitaly.ServerService.DiskStatistics:input_type -> gitaly.DiskStatisticsRequest - 4, // 4: gitaly.ServerService.ClockSynced:input_type -> gitaly.ClockSyncedRequest - 1, // 5: gitaly.ServerService.ServerInfo:output_type -> gitaly.ServerInfoResponse - 3, // 6: gitaly.ServerService.DiskStatistics:output_type -> gitaly.DiskStatisticsResponse - 5, // 7: gitaly.ServerService.ClockSynced:output_type -> gitaly.ClockSyncedResponse - 5, // [5:8] is the sub-list for method output_type - 2, // [2:5] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 8, // 0: gitaly.ServerInfoResponse.storage_statuses:type_name -> gitaly.ServerInfoResponse.StorageStatus + 9, // 1: gitaly.DiskStatisticsResponse.storage_statuses:type_name -> gitaly.DiskStatisticsResponse.StorageStatus + 13, // 2: gitaly.ReadinessCheckRequest.timeout:type_name -> google.protobuf.Duration + 10, // 3: gitaly.ReadinessCheckResponse.ok_response:type_name -> gitaly.ReadinessCheckResponse.Ok + 11, // 4: gitaly.ReadinessCheckResponse.failure_response:type_name -> gitaly.ReadinessCheckResponse.Failure + 12, // 5: gitaly.ReadinessCheckResponse.Failure.failed_checks:type_name -> gitaly.ReadinessCheckResponse.Failure.Response + 0, // 6: gitaly.ServerService.ServerInfo:input_type -> gitaly.ServerInfoRequest + 2, // 7: gitaly.ServerService.DiskStatistics:input_type -> gitaly.DiskStatisticsRequest + 4, // 8: gitaly.ServerService.ClockSynced:input_type -> gitaly.ClockSyncedRequest + 6, // 9: gitaly.ServerService.ReadinessCheck:input_type -> gitaly.ReadinessCheckRequest + 1, // 10: gitaly.ServerService.ServerInfo:output_type -> gitaly.ServerInfoResponse + 3, // 11: gitaly.ServerService.DiskStatistics:output_type -> gitaly.DiskStatisticsResponse + 5, // 12: gitaly.ServerService.ClockSynced:output_type -> gitaly.ClockSyncedResponse + 7, // 13: gitaly.ServerService.ReadinessCheck:output_type -> gitaly.ReadinessCheckResponse + 10, // [10:14] is the sub-list for method output_type + 6, // [6:10] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_server_proto_init() } @@ -681,7 +1007,7 @@ func file_server_proto_init() { } } file_server_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ServerInfoResponse_StorageStatus); i { + switch v := v.(*ReadinessCheckRequest); i { case 0: return &v.state case 1: @@ -693,6 +1019,30 @@ func file_server_proto_init() { } } file_server_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadinessCheckResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerInfoResponse_StorageStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DiskStatisticsResponse_StorageStatus); i { case 0: return &v.state @@ -704,6 +1054,46 @@ func file_server_proto_init() { return nil } } + file_server_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadinessCheckResponse_Ok); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadinessCheckResponse_Failure); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_server_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadinessCheckResponse_Failure_Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_server_proto_msgTypes[7].OneofWrappers = []interface{}{ + (*ReadinessCheckResponse_OkResponse)(nil), + (*ReadinessCheckResponse_FailureResponse)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -711,7 +1101,7 @@ func file_server_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_server_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/go/gitalypb/server_grpc.pb.go b/proto/go/gitalypb/server_grpc.pb.go index ff48ef8e7..ac706dd5d 100644 --- a/proto/go/gitalypb/server_grpc.pb.go +++ b/proto/go/gitalypb/server_grpc.pb.go @@ -29,6 +29,8 @@ type ServerServiceClient interface { // ClockSynced checks if machine clock is synced // (the offset is less that the one passed in the request). ClockSynced(ctx context.Context, in *ClockSyncedRequest, opts ...grpc.CallOption) (*ClockSyncedResponse, error) + // ReadinessCheck runs the set of the checks to make sure service is in operational state. + ReadinessCheck(ctx context.Context, in *ReadinessCheckRequest, opts ...grpc.CallOption) (*ReadinessCheckResponse, error) } type serverServiceClient struct { @@ -66,6 +68,15 @@ func (c *serverServiceClient) ClockSynced(ctx context.Context, in *ClockSyncedRe return out, nil } +func (c *serverServiceClient) ReadinessCheck(ctx context.Context, in *ReadinessCheckRequest, opts ...grpc.CallOption) (*ReadinessCheckResponse, error) { + out := new(ReadinessCheckResponse) + err := c.cc.Invoke(ctx, "/gitaly.ServerService/ReadinessCheck", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ServerServiceServer is the server API for ServerService service. // All implementations must embed UnimplementedServerServiceServer // for forward compatibility @@ -77,6 +88,8 @@ type ServerServiceServer interface { // ClockSynced checks if machine clock is synced // (the offset is less that the one passed in the request). ClockSynced(context.Context, *ClockSyncedRequest) (*ClockSyncedResponse, error) + // ReadinessCheck runs the set of the checks to make sure service is in operational state. + ReadinessCheck(context.Context, *ReadinessCheckRequest) (*ReadinessCheckResponse, error) mustEmbedUnimplementedServerServiceServer() } @@ -93,6 +106,9 @@ func (UnimplementedServerServiceServer) DiskStatistics(context.Context, *DiskSta func (UnimplementedServerServiceServer) ClockSynced(context.Context, *ClockSyncedRequest) (*ClockSyncedResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ClockSynced not implemented") } +func (UnimplementedServerServiceServer) ReadinessCheck(context.Context, *ReadinessCheckRequest) (*ReadinessCheckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReadinessCheck not implemented") +} func (UnimplementedServerServiceServer) mustEmbedUnimplementedServerServiceServer() {} // UnsafeServerServiceServer may be embedded to opt out of forward compatibility for this service. @@ -160,6 +176,24 @@ func _ServerService_ClockSynced_Handler(srv interface{}, ctx context.Context, de return interceptor(ctx, in, info, handler) } +func _ServerService_ReadinessCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadinessCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServerServiceServer).ReadinessCheck(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.ServerService/ReadinessCheck", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServerServiceServer).ReadinessCheck(ctx, req.(*ReadinessCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ServerService_ServiceDesc is the grpc.ServiceDesc for ServerService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -179,6 +213,10 @@ var ServerService_ServiceDesc = grpc.ServiceDesc{ MethodName: "ClockSynced", Handler: _ServerService_ClockSynced_Handler, }, + { + MethodName: "ReadinessCheck", + Handler: _ServerService_ReadinessCheck_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "server.proto", diff --git a/proto/server.proto b/proto/server.proto index 4a7fd667b..d1df58336 100644 --- a/proto/server.proto +++ b/proto/server.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package gitaly; +import "google/protobuf/duration.proto"; import "lint.proto"; option go_package = "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"; @@ -20,6 +21,8 @@ service ServerService { // (the offset is less that the one passed in the request). rpc ClockSynced(ClockSyncedRequest) returns (ClockSyncedResponse); + // ReadinessCheck runs the set of the checks to make sure service is in operational state. + rpc ReadinessCheck(ReadinessCheckRequest) returns (ReadinessCheckResponse); } // This comment is left unintentionally blank. @@ -87,3 +90,36 @@ message ClockSyncedResponse { // synced is set to true if system clock has an affordable drift compared to NTP service. bool synced = 1; } + +// ReadinessCheckRequest is used to verify if the service is in operational state. +message ReadinessCheckRequest { + // Timeout is an amount of milliseconds for the check to run before give up and mark as failed. + google.protobuf.Duration timeout = 1; +} + +// ReadinessCheckResponse is just a stub now and contains no information. +// If the service is not in the operational state the error will be returned instead. +message ReadinessCheckResponse { + // Ok represents response if none checks failed. + message Ok { + } + // Failure represents response if at least one check failed. + message Failure { + // Response contains information about failed check. + message Response { + // Name is a name of the check that was performed. + string name = 1; + // ErrorMessage is a cause of the check failure. + string error_message = 2; + } + // FailedChecks is a list of failed checks. + repeated Response failed_checks = 1; + } + + oneof Result { + // OkResponse is set when all checks pass. + Ok ok_response = 1; + // FailureResponse is set if at least one check failed. + Failure failure_response = 2; + } +} diff --git a/ruby/proto/gitaly/server_pb.rb b/ruby/proto/gitaly/server_pb.rb index 8e41430d4..e1455d25c 100644 --- a/ruby/proto/gitaly/server_pb.rb +++ b/ruby/proto/gitaly/server_pb.rb @@ -1,6 +1,7 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: server.proto +require 'google/protobuf/duration_pb' require 'lint_pb' require 'google/protobuf' @@ -38,6 +39,24 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_message "gitaly.ClockSyncedResponse" do optional :synced, :bool, 1 end + add_message "gitaly.ReadinessCheckRequest" do + optional :timeout, :message, 1, "google.protobuf.Duration" + end + add_message "gitaly.ReadinessCheckResponse" do + oneof :Result do + optional :ok_response, :message, 1, "gitaly.ReadinessCheckResponse.Ok" + optional :failure_response, :message, 2, "gitaly.ReadinessCheckResponse.Failure" + end + end + add_message "gitaly.ReadinessCheckResponse.Ok" do + end + add_message "gitaly.ReadinessCheckResponse.Failure" do + repeated :failed_checks, :message, 1, "gitaly.ReadinessCheckResponse.Failure.Response" + end + add_message "gitaly.ReadinessCheckResponse.Failure.Response" do + optional :name, :string, 1 + optional :error_message, :string, 2 + end end end @@ -50,4 +69,9 @@ module Gitaly DiskStatisticsResponse::StorageStatus = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DiskStatisticsResponse.StorageStatus").msgclass ClockSyncedRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ClockSyncedRequest").msgclass ClockSyncedResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ClockSyncedResponse").msgclass + ReadinessCheckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReadinessCheckRequest").msgclass + ReadinessCheckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReadinessCheckResponse").msgclass + ReadinessCheckResponse::Ok = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReadinessCheckResponse.Ok").msgclass + ReadinessCheckResponse::Failure = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReadinessCheckResponse.Failure").msgclass + ReadinessCheckResponse::Failure::Response = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReadinessCheckResponse.Failure.Response").msgclass end diff --git a/ruby/proto/gitaly/server_services_pb.rb b/ruby/proto/gitaly/server_services_pb.rb index 37d7825f7..2e7ba45f2 100644 --- a/ruby/proto/gitaly/server_services_pb.rb +++ b/ruby/proto/gitaly/server_services_pb.rb @@ -22,6 +22,8 @@ module Gitaly # ClockSynced checks if machine clock is synced # (the offset is less that the one passed in the request). rpc :ClockSynced, ::Gitaly::ClockSyncedRequest, ::Gitaly::ClockSyncedResponse + # ReadinessCheck runs the set of the checks to make sure service is in operational state. + rpc :ReadinessCheck, ::Gitaly::ReadinessCheckRequest, ::Gitaly::ReadinessCheckResponse end Stub = Service.rpc_stub_class |