Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-03-20 09:21:03 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-03-20 09:21:03 +0300
commit1bb075dd1fd0b6d840ba2c59f8aaac1542655ee7 (patch)
tree78581a7150400518a07458bd16734a13e10ff1b6
parent969bac80e2f246867c1a976864bd1f5b34ee43dd (diff)
RPC ConsistencyCheck and Praefect "reconcile" subcommand
-rw-r--r--changelogs/unreleased/po-node-recovery.yml5
-rw-r--r--cmd/praefect/main.go21
-rw-r--r--cmd/praefect/subcmd.go23
-rw-r--r--cmd/praefect/subcmd_pingnodes.go17
-rw-r--r--cmd/praefect/subcmd_reconcile.go148
-rw-r--r--internal/praefect/consistencycheck_test.go123
-rw-r--r--internal/praefect/helper_test.go7
-rw-r--r--internal/praefect/server.go5
-rw-r--r--internal/praefect/server_test.go2
-rw-r--r--internal/praefect/service/info/consistencycheck.go267
-rw-r--r--internal/praefect/service/info/server.go22
-rw-r--r--internal/praefect/service/server/server.go1
-rw-r--r--proto/go/gitalypb/praefect.pb.go251
-rw-r--r--proto/praefect.proto30
-rw-r--r--ruby/proto/gitaly/praefect_pb.rb13
-rw-r--r--ruby/proto/gitaly/praefect_services_pb.rb5
16 files changed, 895 insertions, 45 deletions
diff --git a/changelogs/unreleased/po-node-recovery.yml b/changelogs/unreleased/po-node-recovery.yml
new file mode 100644
index 000000000..84650d0b7
--- /dev/null
+++ b/changelogs/unreleased/po-node-recovery.yml
@@ -0,0 +1,5 @@
+---
+title: RPC ConsistencyCheck and Praefect reconcile subcommand
+merge_request: 1903
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 6ec446451..194bb7fdf 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -24,6 +24,25 @@
//
// praefect -config PATH_TO_CONFIG dial-nodes
//
+// Reconcile
+//
+// The subcommand "reconcile" performs a consistency check of a backend storage
+// against the primary or another storage in the same virtual storage group.
+//
+// praefect -config PATH_TO_CONFIG reconcile -virtual <vstorage> -target <t-storage> [-reference <r-storage>]
+//
+// "-virtual" specifies which virtual storage the target and reference
+// belong to.
+//
+// "-target" specifies the storage name of the backend Gitaly you wish to
+// reconcile.
+//
+// "-reference" is an optional argument that specifies which storage location to
+// check the target against. If an inconsistency is found, the target will
+// attempt to repair itself using the reference as the source of truth. If the
+// reference storage is omitted, Praefect will perform the check against the
+// current primary. If the primary is the same as the target, an error will
+// occur.
package main
import (
@@ -188,7 +207,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return fmt.Errorf("unable to create a bootstrap: %v", err)
}
- srv.RegisterServices(nodeManager, conf)
+ srv.RegisterServices(nodeManager, conf, ds)
b.StopAction = srv.GracefulStop
for _, cfg := range cfgs {
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 0834a04ae..a2e1da8e3 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -5,10 +5,14 @@ import (
"fmt"
"os"
"os/signal"
+ "time"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
+ "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+ "google.golang.org/grpc"
)
const invocationPrefix = progname + " -config CONFIG_TOML"
@@ -32,6 +36,8 @@ func subCommand(conf config.Config, arg0 string, argRest []string) int {
return sqlMigrateDown(conf, argRest)
case "dial-nodes":
return dialNodes(conf)
+ case "reconcile":
+ return reconcile(conf, argRest)
default:
printfErr("%s: unknown subcommand: %q\n", progname, arg0)
return 1
@@ -94,3 +100,20 @@ func openDB(conf config.DB) (*sql.DB, func(), int) {
func printfErr(format string, a ...interface{}) (int, error) {
return fmt.Fprintf(os.Stderr, format, a...)
}
+
+func subCmdDial(addr, token string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ opts = append(opts,
+ grpc.WithBlock(),
+ grpc.WithTimeout(30*time.Second),
+ )
+
+ if len(token) > 0 {
+ opts = append(opts,
+ grpc.WithPerRPCCredentials(
+ gitalyauth.RPCCredentialsV2(token),
+ ),
+ )
+ }
+
+ return client.Dial(addr, opts)
+}
diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go
index 8c0b7a7ab..b3bf24fec 100644
--- a/cmd/praefect/subcmd_pingnodes.go
+++ b/cmd/praefect/subcmd_pingnodes.go
@@ -8,8 +8,6 @@ import (
"sync"
"time"
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -71,20 +69,7 @@ func dialNodes(conf config.Config) int {
}
func (npr *nodePing) dial() (*grpc.ClientConn, error) {
- opts := []grpc.DialOption{
- grpc.WithBlock(),
- grpc.WithTimeout(30 * time.Second),
- }
-
- if len(npr.token) > 0 {
- opts = append(opts,
- grpc.WithPerRPCCredentials(
- gitalyauth.RPCCredentialsV2(npr.token),
- ),
- )
- }
-
- return client.Dial(npr.address, opts)
+ return subCmdDial(npr.address, npr.token)
}
func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go
new file mode 100644
index 000000000..21f3ed7fc
--- /dev/null
+++ b/cmd/praefect/subcmd_reconcile.go
@@ -0,0 +1,148 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+type nodeReconciler struct {
+ conf config.Config
+ virtualStorage string
+ targetStorage string
+ referenceStorage string
+}
+
+func reconcile(conf config.Config, subCmdArgs []string) int {
+ var (
+ fs = flag.NewFlagSet("reconcile", flag.ExitOnError)
+ vs = fs.String("virtual", "", "virtual storage for target storage")
+ t = fs.String("target", "", "target storage to reconcile")
+ r = fs.String("reference", "", "reference storage to reconcile (optional)")
+ )
+
+ if err := fs.Parse(subCmdArgs); err != nil {
+ log.Printf("unable to parse args %v: %s", subCmdArgs, err)
+ return 1
+ }
+
+ nr := nodeReconciler{
+ conf: conf,
+ virtualStorage: *vs,
+ targetStorage: *t,
+ referenceStorage: *r,
+ }
+
+ if err := nr.reconcile(); err != nil {
+ log.Print("unable to reconcile: ", err)
+ return 1
+ }
+
+ return 0
+}
+
+func (nr nodeReconciler) reconcile() error {
+ if err := nr.validateArgs(); err != nil {
+ return err
+ }
+
+ var nodeAddr string
+ switch {
+ case nr.conf.SocketPath != "":
+ nodeAddr = "unix://" + nr.conf.SocketPath
+ case nr.conf.ListenAddr != "":
+ nodeAddr = "tcp://" + nr.conf.ListenAddr
+ default:
+ return errors.New("no Praefect address configured")
+ }
+
+ cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token)
+ if err != nil {
+ return err
+ }
+
+ pCli := gitalypb.NewPraefectInfoServiceClient(cc)
+
+ request := &gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: nr.virtualStorage,
+ TargetStorage: nr.targetStorage,
+ ReferenceStorage: nr.referenceStorage,
+ }
+ stream, err := pCli.ConsistencyCheck(context.TODO(), request)
+ if err != nil {
+ return err
+ }
+
+ if err := nr.consumeStream(stream); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (nr nodeReconciler) validateArgs() error {
+ var vsFound, tFound, rFound bool
+
+ for _, vs := range nr.conf.VirtualStorages {
+ if vs.Name != nr.virtualStorage {
+ continue
+ }
+ vsFound = true
+
+ for _, n := range vs.Nodes {
+ if n.Storage == nr.targetStorage {
+ tFound = true
+ }
+ if n.Storage == nr.referenceStorage {
+ rFound = true
+ }
+ }
+ }
+
+ if !vsFound {
+ return fmt.Errorf(
+ "cannot find virtual storage %s in config", nr.virtualStorage,
+ )
+ }
+ if !tFound {
+ return fmt.Errorf(
+ "cannot find target storage %s in virtual storage %q in config",
+ nr.targetStorage, nr.virtualStorage,
+ )
+ }
+ if nr.referenceStorage != "" && !rFound {
+ return fmt.Errorf(
+ "cannot find reference storage %q in virtual storage %q in config",
+ nr.referenceStorage, nr.virtualStorage,
+ )
+ }
+
+ return nil
+}
+
+func (nr nodeReconciler) consumeStream(stream gitalypb.PraefectInfoService_ConsistencyCheckClient) error {
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+
+ if resp.GetReferenceChecksum() != resp.GetTargetChecksum() {
+ log.Printf(
+ "INCONSISTENT: %s - replication scheduled: #%d",
+ resp.GetRepoRelativePath(),
+ resp.GetReplJobId(),
+ )
+ }
+ }
+ return nil
+}
diff --git a/internal/praefect/consistencycheck_test.go b/internal/praefect/consistencycheck_test.go
new file mode 100644
index 000000000..dce2bb473
--- /dev/null
+++ b/internal/praefect/consistencycheck_test.go
@@ -0,0 +1,123 @@
+package praefect
+
+import (
+ "context"
+ "io"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestConsistencyCheck(t *testing.T) {
+ oldStorages := gconfig.Config.Storages
+ defer func() { gconfig.Config.Storages = oldStorages }()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*models.Node{
+ 0: {
+ DefaultPrimary: true,
+ Storage: "gitaly-0",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ 1: {
+ Storage: "gitaly-1",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
+ },
+ },
+ }
+
+ virtualStorage := conf.VirtualStorages[0]
+ primary := virtualStorage.Nodes[0]
+ secondary := virtualStorage.Nodes[1]
+
+ testStorages := []gconfig.Storage{
+ {
+ Name: virtualStorage.Nodes[0].Storage,
+ Path: tempStoragePath(t),
+ },
+ {
+ Name: virtualStorage.Nodes[1].Storage,
+ Path: tempStoragePath(t),
+ },
+ }
+
+ gconfig.Config.Storages = append(gconfig.Config.Storages, testStorages...)
+ defer func() {
+ for _, ts := range testStorages {
+ require.NoError(t, os.RemoveAll(ts.Path))
+ }
+ }()
+
+ repo0, _, cleanup0 := testhelper.NewTestRepo(t)
+ defer cleanup0()
+
+ _, _, cleanupReference := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[0].Storage)
+ defer cleanupReference()
+
+ _, targetRepoPath, cleanupTarget := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[1].Storage)
+ defer cleanupTarget()
+
+ cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ defer cleanup()
+
+ praefectCli := gitalypb.NewPraefectInfoServiceClient(cc)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ reposAreConsistent := func() (consistent bool) {
+ stream, err := praefectCli.ConsistencyCheck(ctx, &gitalypb.ConsistencyCheckRequest{
+ VirtualStorage: virtualStorage.Name,
+ ReferenceStorage: primary.Storage,
+ TargetStorage: secondary.Storage,
+ })
+ require.NoError(t, err)
+
+ responses := consumeConsistencyCheckResponses(t, stream)
+ require.Len(t, responses, 1)
+
+ resp := responses[0]
+ require.Equal(t, repo0.RelativePath, resp.RepoRelativePath)
+
+ consistent = resp.TargetChecksum == resp.ReferenceChecksum
+ if !consistent {
+ require.NotZero(t, resp.ReplJobId,
+ "A replication job should be scheduled when inconsistent")
+ }
+
+ return consistent
+ }
+
+ require.True(t, reposAreConsistent(),
+ "both repos expected to be consistent after initial clone")
+
+ testhelper.MustRunCommand(t, nil, "git", "-C", targetRepoPath, "update-ref", "HEAD", "spooky-stuff")
+
+ require.False(t, reposAreConsistent(),
+ "repos should no longer be consistent after target HEAD changed")
+}
+
+func consumeConsistencyCheckResponses(t *testing.T, stream gitalypb.PraefectInfoService_ConsistencyCheckClient) []*gitalypb.ConsistencyCheckResponse {
+ var responses []*gitalypb.ConsistencyCheckResponse
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(t, err)
+ responses = append(responses, resp)
+ }
+ return responses
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 4c93443fb..f16350f96 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
+ gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -22,6 +23,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
+ "gitlab.com/gitlab-org/gitaly/internal/service/internalgitaly"
"gitlab.com/gitlab-org/gitaly/internal/service/repository"
gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -129,7 +131,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
errQ := make(chan error)
- prf.RegisterServices(nodeMgr, conf)
+ prf.RegisterServices(nodeMgr, conf, nil)
go func() {
errQ <- prf.Serve(listener, false)
}()
@@ -205,7 +207,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
errQ := make(chan error)
ctx, cancel := testhelper.Context()
- prf.RegisterServices(nodeMgr, conf)
+ prf.RegisterServices(nodeMgr, conf, ds)
go func() { errQ <- prf.Serve(listener, false) }()
go func() { errQ <- replmgr.ProcessBacklog(ctx, noopBackoffFunc) }()
@@ -247,6 +249,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string,
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer, internalSocket))
+ gitalypb.RegisterInternalGitalyServer(server, internalgitaly.NewServer(gconfig.Config.Storages))
healthpb.RegisterHealthServer(server, health.NewServer())
errQ := make(chan error)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 41f21974e..8b85edb45 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -17,6 +17,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -122,10 +123,10 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
}
// RegisterServices will register any services praefect needs to handle rpcs on its own
-func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config) {
+func (srv *Server) RegisterServices(nm nodes.Manager, conf config.Config, ds datastore.Datastore) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm))
+ gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, ds))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
grpc_prometheus.Register(srv.s)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c70777fb9..1cdee452a 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -143,7 +143,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
listener, port := listenAvailPort(t)
go func() {
- srv.RegisterServices(nodeMgr, conf)
+ srv.RegisterServices(nodeMgr, conf, nil)
srv.Serve(listener, false)
}()
diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go
new file mode 100644
index 000000000..64e3d2cfb
--- /dev/null
+++ b/internal/praefect/service/info/consistencycheck.go
@@ -0,0 +1,267 @@
+package info
+
+import (
+ "context"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func (s *Server) validateConsistencyCheckRequest(req *gitalypb.ConsistencyCheckRequest) error {
+ if req.GetTargetStorage() == "" {
+ return status.Error(codes.InvalidArgument, "missing target storage")
+ }
+ if req.GetVirtualStorage() == "" {
+ return status.Error(codes.InvalidArgument, "missing virtual storage")
+ }
+ if req.GetReferenceStorage() == req.GetTargetStorage() {
+ return status.Errorf(
+ codes.InvalidArgument,
+ "target storage %q cannot match reference storage %q",
+ req.GetTargetStorage(), req.GetReferenceStorage(),
+ )
+ }
+ return nil
+}
+
+func (s *Server) getNodes(req *gitalypb.ConsistencyCheckRequest) (target, reference nodes.Node, _ error) {
+ shard, err := s.nodeMgr.GetShard(req.GetVirtualStorage())
+ if err != nil {
+ return nil, nil, status.Error(codes.NotFound, err.Error())
+ }
+
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return nil, nil, err
+ }
+ secondaries, err := shard.GetSecondaries()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // search for target node amongst all nodes in shard
+ for _, n := range append(secondaries, primary) {
+ if n.GetStorage() == req.GetTargetStorage() {
+ target = n
+ break
+ }
+ }
+ if target == nil {
+ return nil, nil, status.Errorf(
+ codes.NotFound,
+ "unable to find target storage %q",
+ req.GetTargetStorage(),
+ )
+ }
+
+ // set reference node to default or requested storage
+ switch {
+ case req.GetReferenceStorage() == "" && req.GetTargetStorage() == primary.GetStorage():
+ return nil, nil, status.Errorf(
+ codes.InvalidArgument,
+ "target storage %q is same as current primary, must provide alternate reference",
+ req.GetTargetStorage(),
+ )
+ case req.GetReferenceStorage() == "":
+ reference = primary // default
+ case req.GetReferenceStorage() != "":
+ for _, secondary := range append(secondaries, primary) {
+ if secondary.GetStorage() == req.GetReferenceStorage() {
+ reference = secondary
+ break
+ }
+ }
+ if reference == nil {
+ return nil, nil, status.Errorf(
+ codes.NotFound,
+ "unable to find reference storage %q in nodes for shard %q",
+ req.GetReferenceStorage(),
+ req.GetVirtualStorage(),
+ )
+ }
+ }
+
+ return target, reference, nil
+}
+
+func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node) error {
+ defer close(walkerQ)
+
+ iClient := gitalypb.NewInternalGitalyClient(reference.GetConnection())
+ req := &gitalypb.WalkReposRequest{
+ StorageName: reference.GetStorage(),
+ }
+
+ walkStream, err := iClient.WalkRepos(ctx, req)
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, err := walkStream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ walkerQ <- resp.GetRelativePath()
+ }
+
+ return nil
+}
+
+func checksumRepo(ctx context.Context, relpath string, node nodes.Node) (string, error) {
+ cli := gitalypb.NewRepositoryServiceClient(node.GetConnection())
+ resp, err := cli.CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: relpath,
+ StorageName: node.GetStorage(),
+ },
+ })
+ if err != nil {
+ return "", err
+ }
+
+ return resp.GetChecksum(), nil
+}
+
+type checksumResult struct {
+ relativePath string
+ target string
+ reference string
+ targetStorage string
+ referenceStorage string
+}
+
+func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ chan<- checksumResult, target, reference nodes.Node) error {
+ defer close(checksumResultQ)
+
+ for repoRelPath := range relpathQ {
+ cs := checksumResult{
+ relativePath: repoRelPath,
+ targetStorage: target.GetStorage(),
+ referenceStorage: reference.GetStorage(),
+ }
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ g.Go(func() (err error) {
+ cs.target, err = checksumRepo(ctx, repoRelPath, target)
+ if status.Code(err) == codes.NotFound {
+ // missing repo on target is okay, we need to
+ // replicate from reference
+ return nil
+ }
+ return err
+ })
+
+ g.Go(func() (err error) {
+ cs.reference, err = checksumRepo(ctx, repoRelPath, reference)
+ return err
+ })
+
+ if err := g.Wait(); err != nil {
+ return err
+ }
+
+ checksumResultQ <- cs
+ }
+
+ return nil
+}
+
+func scheduleReplication(ctx context.Context, csr checksumResult, ds Datastore, resp *gitalypb.ConsistencyCheckResponse) error {
+ ids, err := ds.CreateReplicaReplJobs(
+ correlation.ExtractFromContext(ctx),
+ csr.relativePath,
+ csr.referenceStorage,
+ []string{csr.targetStorage},
+ datastore.UpdateRepo,
+ nil,
+ )
+ if err != nil {
+ return err
+ }
+
+ if len(ids) != 1 {
+ return status.Errorf(
+ codes.Internal,
+ "datastore unexpectedly returned %d job IDs",
+ len(ids),
+ )
+ }
+ resp.ReplJobId = ids[0]
+
+ if err := ds.UpdateReplJobState(resp.ReplJobId, datastore.JobStateReady); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, ds Datastore, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
+ for csr := range checksumResultQ {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ // continue
+ }
+
+ resp := &gitalypb.ConsistencyCheckResponse{
+ RepoRelativePath: csr.relativePath,
+ ReferenceChecksum: csr.reference,
+ TargetChecksum: csr.target,
+ }
+
+ if csr.reference != csr.target {
+ if err := scheduleReplication(ctx, csr, ds, resp); err != nil {
+ return err
+ }
+ }
+
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *Server) ConsistencyCheck(req *gitalypb.ConsistencyCheckRequest, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
+ if err := s.validateConsistencyCheckRequest(req); err != nil {
+ return err
+ }
+
+ // target is the node we are checking, reference is the node we are
+ // checking against (e.g. the primary node)
+ target, reference, err := s.getNodes(req)
+ if err != nil {
+ return err
+ }
+
+ walkerQ := make(chan string)
+ checksumResultQ := make(chan checksumResult)
+
+ g, ctx := errgroup.WithContext(stream.Context())
+
+ // the following goroutines form a pipeline where data flows from top
+ // to bottom
+ g.Go(func() error {
+ return walkRepos(ctx, walkerQ, reference)
+ })
+ g.Go(func() error {
+ return checksumRepos(ctx, walkerQ, checksumResultQ, target, reference)
+ })
+ g.Go(func() error {
+ return ensureConsistency(ctx, checksumResultQ, s.datastore, stream)
+ })
+
+ return g.Wait()
+}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 2252a66ce..31990a813 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -1,20 +1,36 @@
package info
import (
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+// Datastore is a subset of the datastore functionality needed by this service
+type Datastore interface {
+ CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change datastore.ChangeType, params datastore.Params) ([]uint64, error)
+ UpdateReplJobState(jobID uint64, newState datastore.JobState) error
+}
+
+// compile time assertion that Datastore is satisfied by
+// datastore.ReplJobsDatastore
+var _ Datastore = (datastore.ReplJobsDatastore)(nil)
+
// Server is a InfoService server
type Server struct {
gitalypb.UnimplementedPraefectInfoServiceServer
- nodeMgr nodes.Manager
+ nodeMgr nodes.Manager
+ conf config.Config
+ datastore Datastore
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager) gitalypb.PraefectInfoServiceServer {
+func NewServer(nodeMgr nodes.Manager, conf config.Config, datastore Datastore) gitalypb.PraefectInfoServiceServer {
s := &Server{
- nodeMgr: nodeMgr,
+ nodeMgr: nodeMgr,
+ conf: conf,
+ datastore: datastore,
}
return s
diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go
index 1788307ed..88991d50b 100644
--- a/internal/praefect/service/server/server.go
+++ b/internal/praefect/service/server/server.go
@@ -8,6 +8,7 @@ import (
// Server is a ServerService server
type Server struct {
+ *gitalypb.UnimplementedPraefectInfoServiceServer
nodeMgr nodes.Manager
conf config.Config
}
diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go
index bf136f268..c3063bcd7 100644
--- a/proto/go/gitalypb/praefect.pb.go
+++ b/proto/go/gitalypb/praefect.pb.go
@@ -161,34 +161,173 @@ func (m *RepositoryReplicasResponse_RepositoryDetails) GetChecksum() string {
return ""
}
+type ConsistencyCheckRequest struct {
+ VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"`
+ // The target storage is the storage you wish to check for inconsistencies
+ // against a reference storage (typically the current primary).
+ TargetStorage string `protobuf:"bytes,2,opt,name=target_storage,json=targetStorage,proto3" json:"target_storage,omitempty"`
+ // Optionally provide a reference storage to compare the target storage
+ // against. If a reference storage is omitted, the current primary will be
+ // used.
+ ReferenceStorage string `protobuf:"bytes,3,opt,name=reference_storage,json=referenceStorage,proto3" json:"reference_storage,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ConsistencyCheckRequest) Reset() { *m = ConsistencyCheckRequest{} }
+func (m *ConsistencyCheckRequest) String() string { return proto.CompactTextString(m) }
+func (*ConsistencyCheckRequest) ProtoMessage() {}
+func (*ConsistencyCheckRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{2}
+}
+
+func (m *ConsistencyCheckRequest) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ConsistencyCheckRequest.Unmarshal(m, b)
+}
+func (m *ConsistencyCheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ConsistencyCheckRequest.Marshal(b, m, deterministic)
+}
+func (m *ConsistencyCheckRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ConsistencyCheckRequest.Merge(m, src)
+}
+func (m *ConsistencyCheckRequest) XXX_Size() int {
+ return xxx_messageInfo_ConsistencyCheckRequest.Size(m)
+}
+func (m *ConsistencyCheckRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_ConsistencyCheckRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsistencyCheckRequest proto.InternalMessageInfo
+
+func (m *ConsistencyCheckRequest) GetVirtualStorage() string {
+ if m != nil {
+ return m.VirtualStorage
+ }
+ return ""
+}
+
+func (m *ConsistencyCheckRequest) GetTargetStorage() string {
+ if m != nil {
+ return m.TargetStorage
+ }
+ return ""
+}
+
+func (m *ConsistencyCheckRequest) GetReferenceStorage() string {
+ if m != nil {
+ return m.ReferenceStorage
+ }
+ return ""
+}
+
+type ConsistencyCheckResponse struct {
+ RepoRelativePath string `protobuf:"bytes,1,opt,name=repo_relative_path,json=repoRelativePath,proto3" json:"repo_relative_path,omitempty"`
+ TargetChecksum string `protobuf:"bytes,2,opt,name=target_checksum,json=targetChecksum,proto3" json:"target_checksum,omitempty"`
+ ReferenceChecksum string `protobuf:"bytes,3,opt,name=reference_checksum,json=referenceChecksum,proto3" json:"reference_checksum,omitempty"`
+ // If resync was enabled, then each inconsistency will schedule a replication
+ // job. A replication ID is returned to track the corresponding job.
+ ReplJobId uint64 `protobuf:"varint,4,opt,name=repl_job_id,json=replJobId,proto3" json:"repl_job_id,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *ConsistencyCheckResponse) Reset() { *m = ConsistencyCheckResponse{} }
+func (m *ConsistencyCheckResponse) String() string { return proto.CompactTextString(m) }
+func (*ConsistencyCheckResponse) ProtoMessage() {}
+func (*ConsistencyCheckResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{3}
+}
+
+func (m *ConsistencyCheckResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_ConsistencyCheckResponse.Unmarshal(m, b)
+}
+func (m *ConsistencyCheckResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_ConsistencyCheckResponse.Marshal(b, m, deterministic)
+}
+func (m *ConsistencyCheckResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ConsistencyCheckResponse.Merge(m, src)
+}
+func (m *ConsistencyCheckResponse) XXX_Size() int {
+ return xxx_messageInfo_ConsistencyCheckResponse.Size(m)
+}
+func (m *ConsistencyCheckResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_ConsistencyCheckResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_ConsistencyCheckResponse proto.InternalMessageInfo
+
+func (m *ConsistencyCheckResponse) GetRepoRelativePath() string {
+ if m != nil {
+ return m.RepoRelativePath
+ }
+ return ""
+}
+
+func (m *ConsistencyCheckResponse) GetTargetChecksum() string {
+ if m != nil {
+ return m.TargetChecksum
+ }
+ return ""
+}
+
+func (m *ConsistencyCheckResponse) GetReferenceChecksum() string {
+ if m != nil {
+ return m.ReferenceChecksum
+ }
+ return ""
+}
+
+func (m *ConsistencyCheckResponse) GetReplJobId() uint64 {
+ if m != nil {
+ return m.ReplJobId
+ }
+ return 0
+}
+
func init() {
proto.RegisterType((*RepositoryReplicasRequest)(nil), "gitaly.RepositoryReplicasRequest")
proto.RegisterType((*RepositoryReplicasResponse)(nil), "gitaly.RepositoryReplicasResponse")
proto.RegisterType((*RepositoryReplicasResponse_RepositoryDetails)(nil), "gitaly.RepositoryReplicasResponse.RepositoryDetails")
+ proto.RegisterType((*ConsistencyCheckRequest)(nil), "gitaly.ConsistencyCheckRequest")
+ proto.RegisterType((*ConsistencyCheckResponse)(nil), "gitaly.ConsistencyCheckResponse")
}
func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) }
var fileDescriptor_d32bf44842ead735 = []byte{
- // 282 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0x84, 0x30,
- 0x10, 0x86, 0x03, 0x9a, 0x15, 0x47, 0x63, 0xb4, 0x5e, 0xb0, 0x27, 0xe4, 0xc4, 0x45, 0x30, 0xe8,
- 0x13, 0x18, 0x2f, 0x5e, 0x74, 0x83, 0x37, 0x6f, 0xa5, 0xce, 0xb2, 0x8d, 0x85, 0xd6, 0xb6, 0x6b,
- 0xb2, 0x89, 0xef, 0xe1, 0x03, 0xf8, 0x96, 0x9e, 0x8c, 0x80, 0x84, 0xb8, 0x1b, 0x37, 0xd9, 0x5b,
- 0xff, 0xf9, 0xff, 0xf9, 0xd2, 0xe9, 0x14, 0x8e, 0xb4, 0x61, 0x38, 0x43, 0xee, 0x52, 0x6d, 0x94,
- 0x53, 0x64, 0x52, 0x09, 0xc7, 0xe4, 0x92, 0x82, 0x14, 0x4d, 0x5f, 0xa3, 0x87, 0x76, 0xce, 0x0c,
- 0x3e, 0x77, 0x2a, 0x7e, 0x80, 0xb3, 0x02, 0xb5, 0xb2, 0xc2, 0x29, 0xb3, 0x2c, 0x50, 0x4b, 0xc1,
- 0x99, 0x2d, 0xf0, 0x75, 0x81, 0xd6, 0x91, 0x1c, 0xc0, 0x0c, 0x66, 0xe8, 0x45, 0x5e, 0x72, 0x90,
- 0x93, 0xb4, 0x63, 0xa6, 0xa3, 0xb6, 0x51, 0x2a, 0xfe, 0xf4, 0x81, 0xae, 0x23, 0x5a, 0xad, 0x1a,
- 0x8b, 0xe4, 0x1e, 0xf6, 0xb4, 0x11, 0x35, 0x1b, 0x78, 0xd7, 0x6b, 0x78, 0x7f, 0x9a, 0x46, 0xd6,
- 0x2d, 0x3a, 0x26, 0xa4, 0x2d, 0x7e, 0x21, 0x64, 0x0a, 0x81, 0xe9, 0xe3, 0xa1, 0x1f, 0xed, 0x6c,
- 0x0d, 0x1c, 0x28, 0x94, 0xc3, 0xc9, 0x8a, 0xbd, 0xcd, 0x4b, 0x10, 0x0a, 0x01, 0x9f, 0x23, 0x7f,
- 0xb1, 0x8b, 0x3a, 0xf4, 0x23, 0x2f, 0xd9, 0x2f, 0x06, 0x9d, 0xbf, 0xc3, 0xe9, 0xb4, 0x5f, 0xd5,
- 0x5d, 0x33, 0x53, 0x8f, 0x68, 0xde, 0x04, 0x47, 0x82, 0x40, 0x56, 0x6f, 0x4d, 0xce, 0xff, 0x9b,
- 0xa8, 0xdd, 0x14, 0x8d, 0x37, 0x0f, 0x1d, 0x07, 0x5f, 0x1f, 0xc9, 0x6e, 0xe0, 0x1f, 0x7b, 0x37,
- 0x97, 0x4f, 0x3f, 0x71, 0xc9, 0xca, 0x94, 0xab, 0x3a, 0xeb, 0x8e, 0x17, 0xca, 0x54, 0x59, 0x07,
- 0xc9, 0xda, 0xaf, 0x91, 0x55, 0xaa, 0xd7, 0xba, 0x2c, 0x27, 0x6d, 0xe9, 0xea, 0x3b, 0x00, 0x00,
- 0xff, 0xff, 0xcb, 0xa0, 0x26, 0x44, 0x61, 0x02, 0x00, 0x00,
+ // 475 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x41, 0x8e, 0xd3, 0x30,
+ 0x14, 0x95, 0x3b, 0xd5, 0xd0, 0xfe, 0x81, 0x4e, 0xc7, 0x2c, 0x08, 0x59, 0x40, 0x88, 0x84, 0xa8,
+ 0xc4, 0xb4, 0x1d, 0x15, 0x4e, 0x30, 0x65, 0x33, 0x2c, 0xa0, 0xca, 0xec, 0xd8, 0x44, 0x4e, 0xfa,
+ 0x9b, 0x1a, 0xd2, 0x38, 0xd8, 0x6e, 0xa5, 0xde, 0x80, 0x1b, 0xb0, 0x61, 0xc7, 0x45, 0x58, 0x71,
+ 0x0d, 0xee, 0xc1, 0x0a, 0x25, 0x76, 0x4c, 0xd5, 0xce, 0x80, 0x34, 0xbb, 0xf8, 0xbd, 0xe7, 0xf7,
+ 0x5f, 0xfe, 0xff, 0x86, 0x5e, 0x29, 0x19, 0x2e, 0x30, 0xd5, 0xa3, 0x52, 0x0a, 0x2d, 0xe8, 0x71,
+ 0xc6, 0x35, 0xcb, 0xb7, 0x3e, 0xe4, 0xbc, 0xb0, 0x98, 0x7f, 0x5f, 0x2d, 0x99, 0xc4, 0xb9, 0x39,
+ 0x85, 0xef, 0xe1, 0x71, 0x84, 0xa5, 0x50, 0x5c, 0x0b, 0xb9, 0x8d, 0xb0, 0xcc, 0x79, 0xca, 0x54,
+ 0x84, 0x9f, 0xd7, 0xa8, 0x34, 0x9d, 0x00, 0x48, 0x47, 0x7a, 0x24, 0x20, 0x83, 0x93, 0x09, 0x1d,
+ 0x19, 0xcf, 0xd1, 0xce, 0xb5, 0x1d, 0x55, 0xf8, 0xbd, 0x05, 0xfe, 0x4d, 0x8e, 0xaa, 0x14, 0x85,
+ 0x42, 0xfa, 0x0e, 0xee, 0x95, 0x92, 0xaf, 0x98, 0xf3, 0x7b, 0x7d, 0x83, 0xdf, 0xde, 0xa5, 0x1d,
+ 0xea, 0x0d, 0x6a, 0xc6, 0x73, 0x15, 0x35, 0x26, 0x74, 0x06, 0x1d, 0x69, 0xe5, 0x5e, 0x2b, 0x38,
+ 0xba, 0xb3, 0xa1, 0x73, 0xf1, 0x53, 0x38, 0x3b, 0xa0, 0xef, 0xd2, 0x09, 0xea, 0x43, 0x27, 0x5d,
+ 0x62, 0xfa, 0x49, 0xad, 0x57, 0x5e, 0x2b, 0x20, 0x83, 0x6e, 0xe4, 0xce, 0xe1, 0x37, 0x02, 0x8f,
+ 0xa6, 0xa2, 0x50, 0x5c, 0x69, 0x2c, 0xd2, 0xed, 0xb4, 0xc2, 0x9b, 0xae, 0x0f, 0xe1, 0x74, 0xc3,
+ 0xa5, 0x5e, 0xb3, 0x3c, 0x56, 0x5a, 0x48, 0x96, 0x61, 0x5d, 0xb0, 0x7b, 0xd9, 0xfe, 0xf2, 0xf3,
+ 0x9c, 0x44, 0x3d, 0x4b, 0x5e, 0x1b, 0x8e, 0x3e, 0x87, 0x9e, 0x66, 0x32, 0x43, 0xed, 0xd4, 0xa6,
+ 0xd8, 0x03, 0x83, 0x36, 0xb2, 0x97, 0x70, 0x26, 0x71, 0x81, 0x12, 0x8b, 0x14, 0x9d, 0xf2, 0xa8,
+ 0x56, 0xf6, 0x1d, 0x61, 0xc5, 0xe1, 0x0f, 0x02, 0xde, 0x61, 0x3c, 0x3b, 0xc2, 0x73, 0xa0, 0xd5,
+ 0x5f, 0xc6, 0x12, 0x73, 0xa6, 0xf9, 0x06, 0xe3, 0x92, 0xe9, 0xa5, 0x89, 0x58, 0x59, 0x95, 0x22,
+ 0xb2, 0xc4, 0x8c, 0xe9, 0x25, 0x7d, 0x01, 0xa7, 0x36, 0xde, 0x5e, 0x33, 0x6c, 0xea, 0xa9, 0x45,
+ 0xe9, 0xb0, 0xb2, 0x6d, 0x02, 0x3a, 0xad, 0x49, 0xf8, 0x37, 0xba, 0x93, 0x3f, 0x81, 0x93, 0x6a,
+ 0x64, 0xf1, 0x47, 0x91, 0xc4, 0x7c, 0xee, 0xb5, 0x03, 0x32, 0x68, 0x47, 0xdd, 0x0a, 0x7a, 0x2b,
+ 0x92, 0xab, 0xf9, 0xe4, 0x17, 0x81, 0x87, 0x33, 0xfb, 0x1a, 0xae, 0x8a, 0x85, 0xb8, 0x46, 0xb9,
+ 0xe1, 0x29, 0x52, 0x04, 0x7a, 0xb8, 0x18, 0xf4, 0xd9, 0xbf, 0x96, 0xa6, 0x1e, 0x8b, 0x1f, 0xfe,
+ 0x7f, 0xaf, 0xc2, 0xce, 0xef, 0xaf, 0x83, 0x76, 0xa7, 0xd5, 0x27, 0x94, 0x41, 0x7f, 0xbf, 0x81,
+ 0xf4, 0x69, 0xe3, 0x70, 0xcb, 0xe4, 0xfd, 0xe0, 0x76, 0xc1, 0x5e, 0x81, 0xd6, 0x05, 0xb9, 0xbc,
+ 0xf8, 0x50, 0xc9, 0x73, 0x96, 0x8c, 0x52, 0xb1, 0x1a, 0x9b, 0xcf, 0xa1, 0x90, 0xd9, 0xd8, 0x98,
+ 0x8c, 0xeb, 0x07, 0x3e, 0xce, 0x84, 0x3d, 0x97, 0x49, 0x72, 0x5c, 0x43, 0xaf, 0xfe, 0x04, 0x00,
+ 0x00, 0xff, 0xff, 0x97, 0x99, 0x72, 0xb2, 0x27, 0x04, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -204,6 +343,11 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PraefectInfoServiceClient interface {
RepositoryReplicas(ctx context.Context, in *RepositoryReplicasRequest, opts ...grpc.CallOption) (*RepositoryReplicasResponse, error)
+ // ConsistencyCheck will perform a consistency check on the requested
+ // virtual storage backend. A stream of repository statuses will be sent
+ // back indicating which repos are consistent with the primary and which ones
+ // need repair.
+ ConsistencyCheck(ctx context.Context, in *ConsistencyCheckRequest, opts ...grpc.CallOption) (PraefectInfoService_ConsistencyCheckClient, error)
}
type praefectInfoServiceClient struct {
@@ -223,9 +367,46 @@ func (c *praefectInfoServiceClient) RepositoryReplicas(ctx context.Context, in *
return out, nil
}
+func (c *praefectInfoServiceClient) ConsistencyCheck(ctx context.Context, in *ConsistencyCheckRequest, opts ...grpc.CallOption) (PraefectInfoService_ConsistencyCheckClient, error) {
+ stream, err := c.cc.NewStream(ctx, &_PraefectInfoService_serviceDesc.Streams[0], "/gitaly.PraefectInfoService/ConsistencyCheck", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &praefectInfoServiceConsistencyCheckClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type PraefectInfoService_ConsistencyCheckClient interface {
+ Recv() (*ConsistencyCheckResponse, error)
+ grpc.ClientStream
+}
+
+type praefectInfoServiceConsistencyCheckClient struct {
+ grpc.ClientStream
+}
+
+func (x *praefectInfoServiceConsistencyCheckClient) Recv() (*ConsistencyCheckResponse, error) {
+ m := new(ConsistencyCheckResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
// PraefectInfoServiceServer is the server API for PraefectInfoService service.
type PraefectInfoServiceServer interface {
RepositoryReplicas(context.Context, *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error)
+ // ConsistencyCheck will perform a consistency check on the requested
+ // virtual storage backend. A stream of repository statuses will be sent
+ // back indicating which repos are consistent with the primary and which ones
+ // need repair.
+ ConsistencyCheck(*ConsistencyCheckRequest, PraefectInfoService_ConsistencyCheckServer) error
}
// UnimplementedPraefectInfoServiceServer can be embedded to have forward compatible implementations.
@@ -235,6 +416,9 @@ type UnimplementedPraefectInfoServiceServer struct {
func (*UnimplementedPraefectInfoServiceServer) RepositoryReplicas(ctx context.Context, req *RepositoryReplicasRequest) (*RepositoryReplicasResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RepositoryReplicas not implemented")
}
+func (*UnimplementedPraefectInfoServiceServer) ConsistencyCheck(req *ConsistencyCheckRequest, srv PraefectInfoService_ConsistencyCheckServer) error {
+ return status.Errorf(codes.Unimplemented, "method ConsistencyCheck not implemented")
+}
func RegisterPraefectInfoServiceServer(s *grpc.Server, srv PraefectInfoServiceServer) {
s.RegisterService(&_PraefectInfoService_serviceDesc, srv)
@@ -258,6 +442,27 @@ func _PraefectInfoService_RepositoryReplicas_Handler(srv interface{}, ctx contex
return interceptor(ctx, in, info, handler)
}
+func _PraefectInfoService_ConsistencyCheck_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(ConsistencyCheckRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(PraefectInfoServiceServer).ConsistencyCheck(m, &praefectInfoServiceConsistencyCheckServer{stream})
+}
+
+type PraefectInfoService_ConsistencyCheckServer interface {
+ Send(*ConsistencyCheckResponse) error
+ grpc.ServerStream
+}
+
+type praefectInfoServiceConsistencyCheckServer struct {
+ grpc.ServerStream
+}
+
+func (x *praefectInfoServiceConsistencyCheckServer) Send(m *ConsistencyCheckResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{
ServiceName: "gitaly.PraefectInfoService",
HandlerType: (*PraefectInfoServiceServer)(nil),
@@ -267,6 +472,12 @@ var _PraefectInfoService_serviceDesc = grpc.ServiceDesc{
Handler: _PraefectInfoService_RepositoryReplicas_Handler,
},
},
- Streams: []grpc.StreamDesc{},
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "ConsistencyCheck",
+ Handler: _PraefectInfoService_ConsistencyCheck_Handler,
+ ServerStreams: true,
+ },
+ },
Metadata: "praefect.proto",
}
diff --git a/proto/praefect.proto b/proto/praefect.proto
index f51d8f5b6..80312b554 100644
--- a/proto/praefect.proto
+++ b/proto/praefect.proto
@@ -14,6 +14,16 @@ service PraefectInfoService {
scope_level: SERVER
};
}
+ // ConsistencyCheck will perform a consistency check on the requested
+ // virtual storage backend. A stream of repository statuses will be sent
+ // back indicating which repos are consistent with the primary and which ones
+ // need repair.
+ rpc ConsistencyCheck(ConsistencyCheckRequest) returns (stream ConsistencyCheckResponse) {
+ option (op_type) = {
+ op: ACCESSOR
+ scope_level: STORAGE
+ };
+ }
}
message RepositoryReplicasRequest{
@@ -29,3 +39,23 @@ message RepositoryReplicasResponse{
RepositoryDetails primary = 1;
repeated RepositoryDetails replicas = 2;
}
+
+message ConsistencyCheckRequest {
+ string virtual_storage = 1 [(storage)=true];
+ // The target storage is the storage you wish to check for inconsistencies
+ // against a reference storage (typically the current primary).
+ string target_storage = 2;
+ // Optionally provide a reference storage to compare the target storage
+ // against. If a reference storage is omitted, the current primary will be
+ // used.
+ string reference_storage = 3;
+}
+
+message ConsistencyCheckResponse {
+ string repo_relative_path = 1;
+ string target_checksum = 2;
+ string reference_checksum = 3;
+ // If resync was enabled, then each inconsistency will schedule a replication
+ // job. A replication ID is returned to track the corresponding job.
+ uint64 repl_job_id = 4;
+}
diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb
index bfae21133..d41b3e9d4 100644
--- a/ruby/proto/gitaly/praefect_pb.rb
+++ b/ruby/proto/gitaly/praefect_pb.rb
@@ -17,10 +17,23 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
optional :repository, :message, 1, "gitaly.Repository"
optional :checksum, :string, 2
end
+ add_message "gitaly.ConsistencyCheckRequest" do
+ optional :virtual_storage, :string, 1
+ optional :target_storage, :string, 2
+ optional :reference_storage, :string, 3
+ end
+ add_message "gitaly.ConsistencyCheckResponse" do
+ optional :repo_relative_path, :string, 1
+ optional :target_checksum, :string, 2
+ optional :reference_checksum, :string, 3
+ optional :repl_job_id, :uint64, 4
+ end
end
module Gitaly
RepositoryReplicasRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasRequest").msgclass
RepositoryReplicasResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse").msgclass
RepositoryReplicasResponse::RepositoryDetails = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse.RepositoryDetails").msgclass
+ ConsistencyCheckRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ConsistencyCheckRequest").msgclass
+ ConsistencyCheckResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ConsistencyCheckResponse").msgclass
end
diff --git a/ruby/proto/gitaly/praefect_services_pb.rb b/ruby/proto/gitaly/praefect_services_pb.rb
index 660b91fe2..6af163c43 100644
--- a/ruby/proto/gitaly/praefect_services_pb.rb
+++ b/ruby/proto/gitaly/praefect_services_pb.rb
@@ -15,6 +15,11 @@ module Gitaly
self.service_name = 'gitaly.PraefectInfoService'
rpc :RepositoryReplicas, RepositoryReplicasRequest, RepositoryReplicasResponse
+ # ConsistencyCheck will perform a consistency check on the requested
+ # virtual storage backend. A stream of repository statuses will be sent
+ # back indicating which repos are consistent with the primary and which ones
+ # need repair.
+ rpc :ConsistencyCheck, ConsistencyCheckRequest, stream(ConsistencyCheckResponse)
end
Stub = Service.rpc_stub_class