diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-03-20 09:21:03 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-03-20 09:21:03 +0300 |
commit | 1bb075dd1fd0b6d840ba2c59f8aaac1542655ee7 (patch) | |
tree | 78581a7150400518a07458bd16734a13e10ff1b6 | |
parent | 969bac80e2f246867c1a976864bd1f5b34ee43dd (diff) |
RPC ConsistencyCheck and Praefect "reconcile" subcommand
-rw-r--r-- | changelogs/unreleased/po-node-recovery.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 21 | ||||
-rw-r--r-- | cmd/praefect/subcmd.go | 23 | ||||
-rw-r--r-- | cmd/praefect/subcmd_pingnodes.go | 17 | ||||
-rw-r--r-- | cmd/praefect/subcmd_reconcile.go | 148 | ||||
-rw-r--r-- | internal/praefect/consistencycheck_test.go | 123 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/server.go | 5 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/service/info/consistencycheck.go | 267 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 22 | ||||
-rw-r--r-- | internal/praefect/service/server/server.go | 1 | ||||
-rw-r--r-- | proto/go/gitalypb/praefect.pb.go | 251 | ||||
-rw-r--r-- | proto/praefect.proto | 30 | ||||
-rw-r--r-- | ruby/proto/gitaly/praefect_pb.rb | 13 | ||||
-rw-r--r-- | ruby/proto/gitaly/praefect_services_pb.rb | 5 |
16 files changed, 895 insertions, 45 deletions
diff --git a/changelogs/unreleased/po-node-recovery.yml b/changelogs/unreleased/po-node-recovery.yml new file mode 100644 index 000000000..84650d0b7 --- /dev/null +++ b/changelogs/unreleased/po-node-recovery.yml @@ -0,0 +1,5 @@ +--- +title: RPC ConsistencyCheck and Praefect reconcile subcommand +merge_request: 1903 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 6ec446451..194bb7fdf 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -24,6 +24,25 @@ // // praefect -config PATH_TO_CONFIG dial-nodes // +// Reconcile +// +// The subcommand "reconcile" performs a consistency check of a backend storage +// against the primary or another storage in the same virtual storage group. +// +// praefect -config PATH_TO_CONFIG reconcile -virtual <vstorage> -target <t-storage> [-reference <r-storage>] +// +// "-virtual" specifies which virtual storage the target and reference +// belong to. +// +// "-target" specifies the storage name of the backend Gitaly you wish to +// reconcile. +// +// "-reference" is an optional argument that specifies which storage location to +// check the target against. If an inconsistency is found, the target will +// attempt to repair itself using the reference as the source of truth. If the +// reference storage is omitted, Praefect will perform the check against the +// current primary. If the primary is the same as the target, an error will +// occur. package main import ( @@ -188,7 +207,7 @@ func run(cfgs []starter.Config, conf config.Config) error { return fmt.Errorf("unable to create a bootstrap: %v", err) } - srv.RegisterServices(nodeManager, conf) + srv.RegisterServices(nodeManager, conf, ds) b.StopAction = srv.GracefulStop for _, cfg := range cfgs { diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 0834a04ae..a2e1da8e3 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -5,10 +5,14 @@ import ( "fmt" "os" "os/signal" + "time" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "google.golang.org/grpc" ) const invocationPrefix = progname + " -config CONFIG_TOML" @@ -32,6 +36,8 @@ func subCommand(conf config.Config, arg0 string, argRest []string) int { return sqlMigrateDown(conf, argRest) case "dial-nodes": return dialNodes(conf) + case "reconcile": + return reconcile(conf, argRest) default: printfErr("%s: unknown subcommand: %q\n", progname, arg0) return 1 @@ -94,3 +100,20 @@ func openDB(conf config.DB) (*sql.DB, func(), int) { func printfErr(format string, a ...interface{}) (int, error) { return fmt.Fprintf(os.Stderr, format, a...) } + +func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + opts = append(opts, + grpc.WithBlock(), + grpc.WithTimeout(30*time.Second), + ) + + if len(token) > 0 { + opts = append(opts, + grpc.WithPerRPCCredentials( + gitalyauth.RPCCredentialsV2(token), + ), + ) + } + + return client.Dial(addr, opts) +} diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go index 8c0b7a7ab..b3bf24fec 100644 --- a/cmd/praefect/subcmd_pingnodes.go +++ b/cmd/praefect/subcmd_pingnodes.go @@ -8,8 +8,6 @@ import ( "sync" "time" - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" @@ -71,20 +69,7 @@ func dialNodes(conf config.Config) int { } func (npr *nodePing) dial() (*grpc.ClientConn, error) { - opts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithTimeout(30 * time.Second), - } - - if len(npr.token) > 0 { - opts = append(opts, - grpc.WithPerRPCCredentials( - gitalyauth.RPCCredentialsV2(npr.token), - ), - ) - } - - return client.Dial(npr.address, opts) + return subCmdDial(npr.address, npr.token) } func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go new file mode 100644 index 000000000..21f3ed7fc --- /dev/null +++ b/cmd/praefect/subcmd_reconcile.go @@ -0,0 +1,148 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "log" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +type nodeReconciler struct { + conf config.Config + virtualStorage string + targetStorage string + referenceStorage string +} + +func reconcile(conf config.Config, subCmdArgs []string) int { + var ( + fs = flag.NewFlagSet("reconcile", flag.ExitOnError) + vs = fs.String("virtual", "", "virtual storage for target storage") + t = fs.String("target", "", "target storage to reconcile") + r = fs.String("reference", "", "reference storage to reconcile (optional)") + ) + + if err := fs.Parse(subCmdArgs); err != nil { + log.Printf("unable to parse args %v: %s", subCmdArgs, err) + return 1 + } + + nr := nodeReconciler{ + conf: conf, + virtualStorage: *vs, + targetStorage: *t, + referenceStorage: *r, + } + + if err := nr.reconcile(); err != nil { + log.Print("unable to reconcile: ", err) + return 1 + } + + return 0 +} + +func (nr nodeReconciler) reconcile() error { + if err := nr.validateArgs(); err != nil { + return err + } + + var nodeAddr string + switch { + case nr.conf.SocketPath != "": + nodeAddr = "unix://" + nr.conf.SocketPath + case nr.conf.ListenAddr != "": + nodeAddr = "tcp://" + nr.conf.ListenAddr + default: + return errors.New("no Praefect address configured") + } + + cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token) + if err != nil { + return err + } + + pCli := gitalypb.NewPraefectInfoServiceClient(cc) + + request := &gitalypb.ConsistencyCheckRequest{ + VirtualStorage: nr.virtualStorage, + TargetStorage: nr.targetStorage, + ReferenceStorage: nr.referenceStorage, + } + stream, err := pCli.ConsistencyCheck(context.TODO(), request) + if err != nil { + return err + } + + if err := nr.consumeStream(stream); err != nil { + return err + } + + return nil +} + +func (nr nodeReconciler) validateArgs() error { + var vsFound, tFound, rFound bool + + for _, vs := range nr.conf.VirtualStorages { + if vs.Name != nr.virtualStorage { + continue + } + vsFound = true + + for _, n := range vs.Nodes { + if n.Storage == nr.targetStorage { + tFound = true + } + if n.Storage == nr.referenceStorage { + rFound = true + } + } + } + + if !vsFound { + return fmt.Errorf( + "cannot find virtual storage %s in config", nr.virtualStorage, + ) + } + if !tFound { + return fmt.Errorf( + "cannot find target storage %s in virtual storage %q in config", + nr.targetStorage, nr.virtualStorage, + ) + } + if nr.referenceStorage != "" && !rFound { + return fmt.Errorf( + "cannot find reference storage %q in virtual storage %q in config", + nr.referenceStorage, nr.virtualStorage, + ) + } + + return nil +} + +func (nr nodeReconciler) consumeStream(stream gitalypb.PraefectInfoService_ConsistencyCheckClient) error { + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + + if resp.GetReferenceChecksum() != resp.GetTargetChecksum() { + log.Printf( + "INCONSISTENT: %s - replication scheduled: #%d", + resp.GetRepoRelativePath(), + resp.GetReplJobId(), + ) + } + } + return nil +} diff --git a/internal/praefect/consistencycheck_test.go b/internal/praefect/consistencycheck_test.go new file mode 100644 index 000000000..dce2bb473 --- /dev/null +++ b/internal/praefect/consistencycheck_test.go @@ -0,0 +1,123 @@ +package praefect + +import ( + "context" + "io" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + gconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestConsistencyCheck(t *testing.T) { + oldStorages := gconfig.Config.Storages + defer func() { gconfig.Config.Storages = oldStorages }() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*models.Node{ + 0: { + DefaultPrimary: true, + Storage: "gitaly-0", + Address: "tcp::/this-doesnt-matter", + }, + 1: { + Storage: "gitaly-1", + Address: "tcp::/this-doesnt-matter", + }, + }, + }, + }, + } + + virtualStorage := conf.VirtualStorages[0] + primary := virtualStorage.Nodes[0] + secondary := virtualStorage.Nodes[1] + + testStorages := []gconfig.Storage{ + { + Name: virtualStorage.Nodes[0].Storage, + Path: tempStoragePath(t), + }, + { + Name: virtualStorage.Nodes[1].Storage, + Path: tempStoragePath(t), + }, + } + + gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...) + defer func() { + for _, ts := range testStorages { + require.NoError(t, os.RemoveAll(ts.Path)) + } + }() + + repo0, _, cleanup0 := testhelper.NewTestRepo(t) + defer cleanup0() + + _, _, cleanupReference := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[0].Storage) + defer cleanupReference() + + _, targetRepoPath, cleanupTarget := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[1].Storage) + defer cleanupTarget() + + cc, _, cleanup := runPraefectServerWithGitaly(t, conf) + defer cleanup() + + praefectCli := gitalypb.NewPraefectInfoServiceClient(cc) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + reposAreConsistent := func() (consistent bool) { + stream, err := praefectCli.ConsistencyCheck(ctx, &gitalypb.ConsistencyCheckRequest{ + VirtualStorage: virtualStorage.Name, + ReferenceStorage: primary.Storage, + TargetStorage: secondary.Storage, + }) + require.NoError(t, err) + + responses := consumeConsistencyCheckResponses(t, stream) + require.Len(t, responses, 1) + + resp := responses[0] + require.Equal(t, repo0.RelativePath, resp.RepoRelativePath) + + consistent = resp.TargetChecksum == resp.ReferenceChecksum + if !consistent { + require.NotZero(t, resp.ReplJobId, + "A replication job should be scheduled when inconsistent") + } + + return consistent + } + + require.True(t, reposAreConsistent(), + "both repos expected to be consistent after initial clone") + + testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "update-ref", "HEAD", "spooky-stuff") + + require.False(t, reposAreConsistent(), + "repos should no longer be consistent after target HEAD changed") +} + +func consumeConsistencyCheckResponses(t *testing.T, stream gitalypb.PraefectInfoService_ConsistencyCheckClient) []*gitalypb.ConsistencyCheckResponse { + var responses []*gitalypb.ConsistencyCheckResponse + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + responses = append(responses, resp) + } + return responses +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 4c93443fb..f16350f96 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" + gconfig "gitlab.com/gitlab-org/gitaly/internal/config" internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -22,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server/auth" + "gitlab.com/gitlab-org/gitaly/internal/service/internalgitaly" "gitlab.com/gitlab-org/gitaly/internal/service/repository" gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -129,7 +131,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st errQ := make(chan error) - prf.RegisterServices(nodeMgr, conf) + prf.RegisterServices(nodeMgr, conf, nil) go func() { errQ <- prf.Serve(listener, false) }() @@ -205,7 +207,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client errQ := make(chan error) ctx, cancel := testhelper.Context() - prf.RegisterServices(nodeMgr, conf) + prf.RegisterServices(nodeMgr, conf, ds) go func() { errQ <- prf.Serve(listener, false) }() go func() { errQ <- replmgr.ProcessBacklog(ctx, noopBackoffFunc) }() @@ -247,6 +249,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer, internalSocket)) + gitalypb.RegisterInternalGitalyServer(server, internalgitaly.NewServer(gconfig.Config.Storages)) healthpb.RegisterHealthServer(server, health.NewServer()) errQ := make(chan error) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 41f21974e..8b85edb45 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/middleware" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -122,10 +123,10 @@ func (srv *Server) Serve(l net.Listener, secure bool) error { } // RegisterServices will register any services praefect needs to handle rpcs on its own -func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config) { +func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config, ds datastore.Datastore) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm)) + gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, ds)) healthpb.RegisterHealthServer(srv.s, health.NewServer()) grpc_prometheus.Register(srv.s) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c70777fb9..1cdee452a 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -143,7 +143,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { listener, port := listenAvailPort(t) go func() { - srv.RegisterServices(nodeMgr, conf) + srv.RegisterServices(nodeMgr, conf, nil) srv.Serve(listener, false) }() diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go new file mode 100644 index 000000000..64e3d2cfb --- /dev/null +++ b/internal/praefect/service/info/consistencycheck.go @@ -0,0 +1,267 @@ +package info + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) validateConsistencyCheckRequest(req *gitalypb.ConsistencyCheckRequest) error { + if req.GetTargetStorage() == "" { + return status.Error(codes.InvalidArgument, "missing target storage") + } + if req.GetVirtualStorage() == "" { + return status.Error(codes.InvalidArgument, "missing virtual storage") + } + if req.GetReferenceStorage() == req.GetTargetStorage() { + return status.Errorf( + codes.InvalidArgument, + "target storage %q cannot match reference storage %q", + req.GetTargetStorage(), req.GetReferenceStorage(), + ) + } + return nil +} + +func (s *Server) getNodes(req *gitalypb.ConsistencyCheckRequest) (target, reference nodes.Node, _ error) { + shard, err := s.nodeMgr.GetShard(req.GetVirtualStorage()) + if err != nil { + return nil, nil, status.Error(codes.NotFound, err.Error()) + } + + primary, err := shard.GetPrimary() + if err != nil { + return nil, nil, err + } + secondaries, err := shard.GetSecondaries() + if err != nil { + return nil, nil, err + } + + // search for target node amongst all nodes in shard + for _, n := range append(secondaries, primary) { + if n.GetStorage() == req.GetTargetStorage() { + target = n + break + } + } + if target == nil { + return nil, nil, status.Errorf( + codes.NotFound, + "unable to find target storage %q", + req.GetTargetStorage(), + ) + } + + // set reference node to default or requested storage + switch { + case req.GetReferenceStorage() == "" && req.GetTargetStorage() == primary.GetStorage(): + return nil, nil, status.Errorf( + codes.InvalidArgument, + "target storage %q is same as current primary, must provide alternate reference", + req.GetTargetStorage(), + ) + case req.GetReferenceStorage() == "": + reference = primary // default + case req.GetReferenceStorage() != "": + for _, secondary := range append(secondaries, primary) { + if secondary.GetStorage() == req.GetReferenceStorage() { + reference = secondary + break + } + } + if reference == nil { + return nil, nil, status.Errorf( + codes.NotFound, + "unable to find reference storage %q in nodes for shard %q", + req.GetReferenceStorage(), + req.GetVirtualStorage(), + ) + } + } + + return target, reference, nil +} + +func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node) error { + defer close(walkerQ) + + iClient := gitalypb.NewInternalGitalyClient(reference.GetConnection()) + req := &gitalypb.WalkReposRequest{ + StorageName: reference.GetStorage(), + } + + walkStream, err := iClient.WalkRepos(ctx, req) + if err != nil { + return err + } + + for { + resp, err := walkStream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + walkerQ <- resp.GetRelativePath() + } + + return nil +} + +func checksumRepo(ctx context.Context, relpath string, node nodes.Node) (string, error) { + cli := gitalypb.NewRepositoryServiceClient(node.GetConnection()) + resp, err := cli.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{ + Repository: &gitalypb.Repository{ + RelativePath: relpath, + StorageName: node.GetStorage(), + }, + }) + if err != nil { + return "", err + } + + return resp.GetChecksum(), nil +} + +type checksumResult struct { + relativePath string + target string + reference string + targetStorage string + referenceStorage string +} + +func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ chan<- checksumResult, target, reference nodes.Node) error { + defer close(checksumResultQ) + + for repoRelPath := range relpathQ { + cs := checksumResult{ + relativePath: repoRelPath, + targetStorage: target.GetStorage(), + referenceStorage: reference.GetStorage(), + } + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() (err error) { + cs.target, err = checksumRepo(ctx, repoRelPath, target) + if status.Code(err) == codes.NotFound { + // missing repo on target is okay, we need to + // replicate from reference + return nil + } + return err + }) + + g.Go(func() (err error) { + cs.reference, err = checksumRepo(ctx, repoRelPath, reference) + return err + }) + + if err := g.Wait(); err != nil { + return err + } + + checksumResultQ <- cs + } + + return nil +} + +func scheduleReplication(ctx context.Context, csr checksumResult, ds Datastore, resp *gitalypb.ConsistencyCheckResponse) error { + ids, err := ds.CreateReplicaReplJobs( + correlation.ExtractFromContext(ctx), + csr.relativePath, + csr.referenceStorage, + []string{csr.targetStorage}, + datastore.UpdateRepo, + nil, + ) + if err != nil { + return err + } + + if len(ids) != 1 { + return status.Errorf( + codes.Internal, + "datastore unexpectedly returned %d job IDs", + len(ids), + ) + } + resp.ReplJobId = ids[0] + + if err := ds.UpdateReplJobState(resp.ReplJobId, datastore.JobStateReady); err != nil { + return err + } + + return nil +} + +func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, ds Datastore, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { + for csr := range checksumResultQ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // continue + } + + resp := &gitalypb.ConsistencyCheckResponse{ + RepoRelativePath: csr.relativePath, + ReferenceChecksum: csr.reference, + TargetChecksum: csr.target, + } + + if csr.reference != csr.target { + if err := scheduleReplication(ctx, csr, ds, resp); err != nil { + return err + } + } + + if err := stream.Send(resp); err != nil { + return err + } + } + return nil +} + +func (s *Server) ConsistencyCheck(req *gitalypb.ConsistencyCheckRequest, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { + if err := s.validateConsistencyCheckRequest(req); err != nil { + return err + } + + // target is the node we are checking, reference is the node we are + // checking against (e.g. the primary node) + target, reference, err := s.getNodes(req) + if err != nil { + return err + } + + walkerQ := make(chan string) + checksumResultQ := make(chan checksumResult) + + g, ctx := errgroup.WithContext(stream.Context()) + + // the following goroutines form a pipeline where data flows from top + // to bottom + g.Go(func() error { + return walkRepos(ctx, walkerQ, reference) + }) + g.Go(func() error { + return checksumRepos(ctx, walkerQ, checksumResultQ, target, reference) + }) + g.Go(func() error { + return ensureConsistency(ctx, checksumResultQ, s.datastore, stream) + }) + + return g.Wait() +} diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 2252a66ce..31990a813 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -1,20 +1,36 @@ package info import ( + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +// Datastore is a subset of the datastore functionality needed by this service +type Datastore interface { + CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change datastore.ChangeType, params datastore.Params) ([]uint64, error) + UpdateReplJobState(jobID uint64, newState datastore.JobState) error +} + +// compile time assertion that Datastore is satisfied by +// datastore.ReplJobsDatastore +var _ Datastore = (datastore.ReplJobsDatastore)(nil) + // Server is a InfoService server type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer - nodeMgr nodes.Manager + nodeMgr nodes.Manager + conf config.Config + datastore Datastore } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager) gitalypb.PraefectInfoServiceServer { +func NewServer(nodeMgr nodes.Manager, conf config.Config, datastore Datastore) gitalypb.PraefectInfoServiceServer { s := &Server{ - nodeMgr: nodeMgr, + nodeMgr: nodeMgr, + conf: conf, + datastore: datastore, } return s diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go index 1788307ed..88991d50b 100644 --- a/internal/praefect/service/server/server.go +++ b/internal/praefect/service/server/server.go @@ -8,6 +8,7 @@ import ( // Server is a ServerService server type Server struct { + *gitalypb.UnimplementedPraefectInfoServiceServer nodeMgr nodes.Manager conf config.Config } diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go index bf136f268..c3063bcd7 100644 --- a/proto/go/gitalypb/praefect.pb.go +++ b/proto/go/gitalypb/praefect.pb.go @@ -161,34 +161,173 @@ func (m *RepositoryReplicasResponse_RepositoryDetails) GetChecksum() string { return "" } +type ConsistencyCheckRequest struct { + VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"` + // The target storage is the storage you wish to check for inconsistencies + // against a reference storage (typically the current primary). + TargetStorage string `protobuf:"bytes,2,opt,name=target_storage,json=targetStorage,proto3" json:"target_storage,omitempty"` + // Optionally provide a reference storage to compare the target storage + // against. If a reference storage is omitted, the current primary will be + // used. + ReferenceStorage string `protobuf:"bytes,3,opt,name=reference_storage,json=referenceStorage,proto3" json:"reference_storage,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ConsistencyCheckRequest) Reset() { *m = ConsistencyCheckRequest{} } +func (m *ConsistencyCheckRequest) String() string { return proto.CompactTextString(m) } +func (*ConsistencyCheckRequest) ProtoMessage() {} +func (*ConsistencyCheckRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{2} +} + +func (m *ConsistencyCheckRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ConsistencyCheckRequest.Unmarshal(m, b) +} +func (m *ConsistencyCheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ConsistencyCheckRequest.Marshal(b, m, deterministic) +} +func (m *ConsistencyCheckRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConsistencyCheckRequest.Merge(m, src) +} +func (m *ConsistencyCheckRequest) XXX_Size() int { + return xxx_messageInfo_ConsistencyCheckRequest.Size(m) +} +func (m *ConsistencyCheckRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ConsistencyCheckRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ConsistencyCheckRequest proto.InternalMessageInfo + +func (m *ConsistencyCheckRequest) GetVirtualStorage() string { + if m != nil { + return m.VirtualStorage + } + return "" +} + +func (m *ConsistencyCheckRequest) GetTargetStorage() string { + if m != nil { + return m.TargetStorage + } + return "" +} + +func (m *ConsistencyCheckRequest) GetReferenceStorage() string { + if m != nil { + return m.ReferenceStorage + } + return "" +} + +type ConsistencyCheckResponse struct { + RepoRelativePath string `protobuf:"bytes,1,opt,name=repo_relative_path,json=repoRelativePath,proto3" json:"repo_relative_path,omitempty"` + TargetChecksum string `protobuf:"bytes,2,opt,name=target_checksum,json=targetChecksum,proto3" json:"target_checksum,omitempty"` + ReferenceChecksum string `protobuf:"bytes,3,opt,name=reference_checksum,json=referenceChecksum,proto3" json:"reference_checksum,omitempty"` + // If resync was enabled, then each inconsistency will schedule a replication + // job. A replication ID is returned to track the corresponding job. + ReplJobId uint64 `protobuf:"varint,4,opt,name=repl_job_id,json=replJobId,proto3" json:"repl_job_id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ConsistencyCheckResponse) Reset() { *m = ConsistencyCheckResponse{} } +func (m *ConsistencyCheckResponse) String() string { return proto.CompactTextString(m) } +func (*ConsistencyCheckResponse) ProtoMessage() {} +func (*ConsistencyCheckResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{3} +} + +func (m *ConsistencyCheckResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ConsistencyCheckResponse.Unmarshal(m, b) +} +func (m *ConsistencyCheckResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ConsistencyCheckResponse.Marshal(b, m, deterministic) +} +func (m *ConsistencyCheckResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConsistencyCheckResponse.Merge(m, src) +} +func (m *ConsistencyCheckResponse) XXX_Size() int { + return xxx_messageInfo_ConsistencyCheckResponse.Size(m) +} +func (m *ConsistencyCheckResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ConsistencyCheckResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ConsistencyCheckResponse proto.InternalMessageInfo + +func (m *ConsistencyCheckResponse) GetRepoRelativePath() string { + if m != nil { + return m.RepoRelativePath + } + return "" +} + +func (m *ConsistencyCheckResponse) GetTargetChecksum() string { + if m != nil { + return m.TargetChecksum + } + return "" +} + +func (m *ConsistencyCheckResponse) GetReferenceChecksum() string { + if m != nil { + return m.ReferenceChecksum + } + return "" +} + +func (m *ConsistencyCheckResponse) GetReplJobId() uint64 { + if m != nil { + return m.ReplJobId + } + return 0 +} + func init() { proto.RegisterType((*RepositoryReplicasRequest)(nil), "gitaly.RepositoryReplicasRequest") proto.RegisterType((*RepositoryReplicasResponse)(nil), "gitaly.RepositoryReplicasResponse") proto.RegisterType((*RepositoryReplicasResponse_RepositoryDetails)(nil), "gitaly.RepositoryReplicasResponse.RepositoryDetails") + proto.RegisterType((*ConsistencyCheckRequest)(nil), "gitaly.ConsistencyCheckRequest") + proto.RegisterType((*ConsistencyCheckResponse)(nil), "gitaly.ConsistencyCheckResponse") } func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) } var fileDescriptor_d32bf44842ead735 = []byte{ - // 282 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0x84, 0x30, - 0x10, 0x86, 0x03, 0x9a, 0x15, 0x47, 0x63, 0xb4, 0x5e, 0xb0, 0x27, 0xe4, 0xc4, 0x45, 0x30, 0xe8, - 0x13, 0x18, 0x2f, 0x5e, 0x74, 0x83, 0x37, 0x6f, 0xa5, 0xce, 0xb2, 0x8d, 0x85, 0xd6, 0xb6, 0x6b, - 0xb2, 0x89, 0xef, 0xe1, 0x03, 0xf8, 0x96, 0x9e, 0x8c, 0x80, 0x84, 0xb8, 0x1b, 0x37, 0xd9, 0x5b, - 0xff, 0xf9, 0xff, 0xf9, 0xd2, 0xe9, 0x14, 0x8e, 0xb4, 0x61, 0x38, 0x43, 0xee, 0x52, 0x6d, 0x94, - 0x53, 0x64, 0x52, 0x09, 0xc7, 0xe4, 0x92, 0x82, 0x14, 0x4d, 0x5f, 0xa3, 0x87, 0x76, 0xce, 0x0c, - 0x3e, 0x77, 0x2a, 0x7e, 0x80, 0xb3, 0x02, 0xb5, 0xb2, 0xc2, 0x29, 0xb3, 0x2c, 0x50, 0x4b, 0xc1, - 0x99, 0x2d, 0xf0, 0x75, 0x81, 0xd6, 0x91, 0x1c, 0xc0, 0x0c, 0x66, 0xe8, 0x45, 0x5e, 0x72, 0x90, - 0x93, 0xb4, 0x63, 0xa6, 0xa3, 0xb6, 0x51, 0x2a, 0xfe, 0xf4, 0x81, 0xae, 0x23, 0x5a, 0xad, 0x1a, - 0x8b, 0xe4, 0x1e, 0xf6, 0xb4, 0x11, 0x35, 0x1b, 0x78, 0xd7, 0x6b, 0x78, 0x7f, 0x9a, 0x46, 0xd6, - 0x2d, 0x3a, 0x26, 0xa4, 0x2d, 0x7e, 0x21, 0x64, 0x0a, 0x81, 0xe9, 0xe3, 0xa1, 0x1f, 0xed, 0x6c, - 0x0d, 0x1c, 0x28, 0x94, 0xc3, 0xc9, 0x8a, 0xbd, 0xcd, 0x4b, 0x10, 0x0a, 0x01, 0x9f, 0x23, 0x7f, - 0xb1, 0x8b, 0x3a, 0xf4, 0x23, 0x2f, 0xd9, 0x2f, 0x06, 0x9d, 0xbf, 0xc3, 0xe9, 0xb4, 0x5f, 0xd5, - 0x5d, 0x33, 0x53, 0x8f, 0x68, 0xde, 0x04, 0x47, 0x82, 0x40, 0x56, 0x6f, 0x4d, 0xce, 0xff, 0x9b, - 0xa8, 0xdd, 0x14, 0x8d, 0x37, 0x0f, 0x1d, 0x07, 0x5f, 0x1f, 0xc9, 0x6e, 0xe0, 0x1f, 0x7b, 0x37, - 0x97, 0x4f, 0x3f, 0x71, 0xc9, 0xca, 0x94, 0xab, 0x3a, 0xeb, 0x8e, 0x17, 0xca, 0x54, 0x59, 0x07, - 0xc9, 0xda, 0xaf, 0x91, 0x55, 0xaa, 0xd7, 0xba, 0x2c, 0x27, 0x6d, 0xe9, 0xea, 0x3b, 0x00, 0x00, - 0xff, 0xff, 0xcb, 0xa0, 0x26, 0x44, 0x61, 0x02, 0x00, 0x00, + // 475 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x41, 0x8e, 0xd3, 0x30, + 0x14, 0x95, 0x3b, 0xd5, 0xd0, 0xfe, 0x81, 0x4e, 0xc7, 0x2c, 0x08, 0x59, 0x40, 0x88, 0x84, 0xa8, + 0xc4, 0xb4, 0x1d, 0x15, 0x4e, 0x30, 0x65, 0x33, 0x2c, 0xa0, 0xca, 0xec, 0xd8, 0x44, 0x4e, 0xfa, + 0x9b, 0x1a, 0xd2, 0x38, 0xd8, 0x6e, 0xa5, 0xde, 0x80, 0x1b, 0xb0, 0x61, 0xc7, 0x45, 0x58, 0x71, + 0x0d, 0xee, 0xc1, 0x0a, 0x25, 0x76, 0x4c, 0xd5, 0xce, 0x80, 0x34, 0xbb, 0xf8, 0xbd, 0xe7, 0xf7, + 0x5f, 0xfe, 0xff, 0x86, 0x5e, 0x29, 0x19, 0x2e, 0x30, 0xd5, 0xa3, 0x52, 0x0a, 0x2d, 0xe8, 0x71, + 0xc6, 0x35, 0xcb, 0xb7, 0x3e, 0xe4, 0xbc, 0xb0, 0x98, 0x7f, 0x5f, 0x2d, 0x99, 0xc4, 0xb9, 0x39, + 0x85, 0xef, 0xe1, 0x71, 0x84, 0xa5, 0x50, 0x5c, 0x0b, 0xb9, 0x8d, 0xb0, 0xcc, 0x79, 0xca, 0x54, + 0x84, 0x9f, 0xd7, 0xa8, 0x34, 0x9d, 0x00, 0x48, 0x47, 0x7a, 0x24, 0x20, 0x83, 0x93, 0x09, 0x1d, + 0x19, 0xcf, 0xd1, 0xce, 0xb5, 0x1d, 0x55, 0xf8, 0xbd, 0x05, 0xfe, 0x4d, 0x8e, 0xaa, 0x14, 0x85, + 0x42, 0xfa, 0x0e, 0xee, 0x95, 0x92, 0xaf, 0x98, 0xf3, 0x7b, 0x7d, 0x83, 0xdf, 0xde, 0xa5, 0x1d, + 0xea, 0x0d, 0x6a, 0xc6, 0x73, 0x15, 0x35, 0x26, 0x74, 0x06, 0x1d, 0x69, 0xe5, 0x5e, 0x2b, 0x38, + 0xba, 0xb3, 0xa1, 0x73, 0xf1, 0x53, 0x38, 0x3b, 0xa0, 0xef, 0xd2, 0x09, 0xea, 0x43, 0x27, 0x5d, + 0x62, 0xfa, 0x49, 0xad, 0x57, 0x5e, 0x2b, 0x20, 0x83, 0x6e, 0xe4, 0xce, 0xe1, 0x37, 0x02, 0x8f, + 0xa6, 0xa2, 0x50, 0x5c, 0x69, 0x2c, 0xd2, 0xed, 0xb4, 0xc2, 0x9b, 0xae, 0x0f, 0xe1, 0x74, 0xc3, + 0xa5, 0x5e, 0xb3, 0x3c, 0x56, 0x5a, 0x48, 0x96, 0x61, 0x5d, 0xb0, 0x7b, 0xd9, 0xfe, 0xf2, 0xf3, + 0x9c, 0x44, 0x3d, 0x4b, 0x5e, 0x1b, 0x8e, 0x3e, 0x87, 0x9e, 0x66, 0x32, 0x43, 0xed, 0xd4, 0xa6, + 0xd8, 0x03, 0x83, 0x36, 0xb2, 0x97, 0x70, 0x26, 0x71, 0x81, 0x12, 0x8b, 0x14, 0x9d, 0xf2, 0xa8, + 0x56, 0xf6, 0x1d, 0x61, 0xc5, 0xe1, 0x0f, 0x02, 0xde, 0x61, 0x3c, 0x3b, 0xc2, 0x73, 0xa0, 0xd5, + 0x5f, 0xc6, 0x12, 0x73, 0xa6, 0xf9, 0x06, 0xe3, 0x92, 0xe9, 0xa5, 0x89, 0x58, 0x59, 0x95, 0x22, + 0xb2, 0xc4, 0x8c, 0xe9, 0x25, 0x7d, 0x01, 0xa7, 0x36, 0xde, 0x5e, 0x33, 0x6c, 0xea, 0xa9, 0x45, + 0xe9, 0xb0, 0xb2, 0x6d, 0x02, 0x3a, 0xad, 0x49, 0xf8, 0x37, 0xba, 0x93, 0x3f, 0x81, 0x93, 0x6a, + 0x64, 0xf1, 0x47, 0x91, 0xc4, 0x7c, 0xee, 0xb5, 0x03, 0x32, 0x68, 0x47, 0xdd, 0x0a, 0x7a, 0x2b, + 0x92, 0xab, 0xf9, 0xe4, 0x17, 0x81, 0x87, 0x33, 0xfb, 0x1a, 0xae, 0x8a, 0x85, 0xb8, 0x46, 0xb9, + 0xe1, 0x29, 0x52, 0x04, 0x7a, 0xb8, 0x18, 0xf4, 0xd9, 0xbf, 0x96, 0xa6, 0x1e, 0x8b, 0x1f, 0xfe, + 0x7f, 0xaf, 0xc2, 0xce, 0xef, 0xaf, 0x83, 0x76, 0xa7, 0xd5, 0x27, 0x94, 0x41, 0x7f, 0xbf, 0x81, + 0xf4, 0x69, 0xe3, 0x70, 0xcb, 0xe4, 0xfd, 0xe0, 0x76, 0xc1, 0x5e, 0x81, 0xd6, 0x05, 0xb9, 0xbc, + 0xf8, 0x50, 0xc9, 0x73, 0x96, 0x8c, 0x52, 0xb1, 0x1a, 0x9b, 0xcf, 0xa1, 0x90, 0xd9, 0xd8, 0x98, + 0x8c, 0xeb, 0x07, 0x3e, 0xce, 0x84, 0x3d, 0x97, 0x49, 0x72, 0x5c, 0x43, 0xaf, 0xfe, 0x04, 0x00, + 0x00, 0xff, 0xff, 0x97, 0x99, 0x72, 0xb2, 0x27, 0x04, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -204,6 +343,11 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type PraefectInfoServiceClient interface { RepositoryReplicas(ctx context.Context, in *RepositoryReplicasRequest, opts ...grpc.CallOption) (*RepositoryReplicasResponse, error) + // ConsistencyCheck will perform a consistency check on the requested + // virtual storage backend. A stream of repository statuses will be sent + // back indicating which repos are consistent with the primary and which ones + // need repair. + ConsistencyCheck(ctx context.Context, in *ConsistencyCheckRequest, opts ...grpc.CallOption) (PraefectInfoService_ConsistencyCheckClient, error) } type praefectInfoServiceClient struct { @@ -223,9 +367,46 @@ func (c *praefectInfoServiceClient) RepositoryReplicas(ctx context.Context, in * return out, nil } +func (c *praefectInfoServiceClient) ConsistencyCheck(ctx context.Context, in *ConsistencyCheckRequest, opts ...grpc.CallOption) (PraefectInfoService_ConsistencyCheckClient, error) { + stream, err := c.cc.NewStream(ctx, &_PraefectInfoService_serviceDesc.Streams[0], "/gitaly.PraefectInfoService/ConsistencyCheck", opts...) + if err != nil { + return nil, err + } + x := &praefectInfoServiceConsistencyCheckClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PraefectInfoService_ConsistencyCheckClient interface { + Recv() (*ConsistencyCheckResponse, error) + grpc.ClientStream +} + +type praefectInfoServiceConsistencyCheckClient struct { + grpc.ClientStream +} + +func (x *praefectInfoServiceConsistencyCheckClient) Recv() (*ConsistencyCheckResponse, error) { + m := new(ConsistencyCheckResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // PraefectInfoServiceServer is the server API for PraefectInfoService service. type PraefectInfoServiceServer interface { RepositoryReplicas(context.Context, *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error) + // ConsistencyCheck will perform a consistency check on the requested + // virtual storage backend. A stream of repository statuses will be sent + // back indicating which repos are consistent with the primary and which ones + // need repair. + ConsistencyCheck(*ConsistencyCheckRequest, PraefectInfoService_ConsistencyCheckServer) error } // UnimplementedPraefectInfoServiceServer can be embedded to have forward compatible implementations. @@ -235,6 +416,9 @@ type UnimplementedPraefectInfoServiceServer struct { func (*UnimplementedPraefectInfoServiceServer) RepositoryReplicas(ctx context.Context, req *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method RepositoryReplicas not implemented") } +func (*UnimplementedPraefectInfoServiceServer) ConsistencyCheck(req *ConsistencyCheckRequest, srv PraefectInfoService_ConsistencyCheckServer) error { + return status.Errorf(codes.Unimplemented, "method ConsistencyCheck not implemented") +} func RegisterPraefectInfoServiceServer(s *grpc.Server, srv PraefectInfoServiceServer) { s.RegisterService(&_PraefectInfoService_serviceDesc, srv) @@ -258,6 +442,27 @@ func _PraefectInfoService_RepositoryReplicas_Handler(srv interface{}, ctx contex return interceptor(ctx, in, info, handler) } +func _PraefectInfoService_ConsistencyCheck_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ConsistencyCheckRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PraefectInfoServiceServer).ConsistencyCheck(m, &praefectInfoServiceConsistencyCheckServer{stream}) +} + +type PraefectInfoService_ConsistencyCheckServer interface { + Send(*ConsistencyCheckResponse) error + grpc.ServerStream +} + +type praefectInfoServiceConsistencyCheckServer struct { + grpc.ServerStream +} + +func (x *praefectInfoServiceConsistencyCheckServer) Send(m *ConsistencyCheckResponse) error { + return x.ServerStream.SendMsg(m) +} + var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.PraefectInfoService", HandlerType: (*PraefectInfoServiceServer)(nil), @@ -267,6 +472,12 @@ var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{ Handler: _PraefectInfoService_RepositoryReplicas_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ConsistencyCheck", + Handler: _PraefectInfoService_ConsistencyCheck_Handler, + ServerStreams: true, + }, + }, Metadata: "praefect.proto", } diff --git a/proto/praefect.proto b/proto/praefect.proto index f51d8f5b6..80312b554 100644 --- a/proto/praefect.proto +++ b/proto/praefect.proto @@ -14,6 +14,16 @@ service PraefectInfoService { scope_level: SERVER }; } + // ConsistencyCheck will perform a consistency check on the requested + // virtual storage backend. A stream of repository statuses will be sent + // back indicating which repos are consistent with the primary and which ones + // need repair. + rpc ConsistencyCheck(ConsistencyCheckRequest) returns (stream ConsistencyCheckResponse) { + option (op_type) = { + op: ACCESSOR + scope_level: STORAGE + }; + } } message RepositoryReplicasRequest{ @@ -29,3 +39,23 @@ message RepositoryReplicasResponse{ RepositoryDetails primary = 1; repeated RepositoryDetails replicas = 2; } + +message ConsistencyCheckRequest { + string virtual_storage = 1 [(storage)=true]; + // The target storage is the storage you wish to check for inconsistencies + // against a reference storage (typically the current primary). + string target_storage = 2; + // Optionally provide a reference storage to compare the target storage + // against. If a reference storage is omitted, the current primary will be + // used. + string reference_storage = 3; +} + +message ConsistencyCheckResponse { + string repo_relative_path = 1; + string target_checksum = 2; + string reference_checksum = 3; + // If resync was enabled, then each inconsistency will schedule a replication + // job. A replication ID is returned to track the corresponding job. + uint64 repl_job_id = 4; +} diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb index bfae21133..d41b3e9d4 100644 --- a/ruby/proto/gitaly/praefect_pb.rb +++ b/ruby/proto/gitaly/praefect_pb.rb @@ -17,10 +17,23 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :repository, :message, 1, "gitaly.Repository" optional :checksum, :string, 2 end + add_message "gitaly.ConsistencyCheckRequest" do + optional :virtual_storage, :string, 1 + optional :target_storage, :string, 2 + optional :reference_storage, :string, 3 + end + add_message "gitaly.ConsistencyCheckResponse" do + optional :repo_relative_path, :string, 1 + optional :target_checksum, :string, 2 + optional :reference_checksum, :string, 3 + optional :repl_job_id, :uint64, 4 + end end module Gitaly RepositoryReplicasRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasRequest").msgclass RepositoryReplicasResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse").msgclass RepositoryReplicasResponse::RepositoryDetails = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse.RepositoryDetails").msgclass + ConsistencyCheckRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ConsistencyCheckRequest").msgclass + ConsistencyCheckResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ConsistencyCheckResponse").msgclass end diff --git a/ruby/proto/gitaly/praefect_services_pb.rb b/ruby/proto/gitaly/praefect_services_pb.rb index 660b91fe2..6af163c43 100644 --- a/ruby/proto/gitaly/praefect_services_pb.rb +++ b/ruby/proto/gitaly/praefect_services_pb.rb @@ -15,6 +15,11 @@ module Gitaly self.service_name = 'gitaly.PraefectInfoService' rpc :RepositoryReplicas, RepositoryReplicasRequest, RepositoryReplicasResponse + # ConsistencyCheck will perform a consistency check on the requested + # virtual storage backend. A stream of repository statuses will be sent + # back indicating which repos are consistent with the primary and which ones + # need repair. + rpc :ConsistencyCheck, ConsistencyCheckRequest, stream(ConsistencyCheckResponse) end Stub = Service.rpc_stub_class |