Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-04-15 14:32:03 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-04-15 14:32:03 +0300
commitf882c0545f51fb2d0fae3187eac0f21472c8ef5d (patch)
treecb120e1336b4d2e89d4ceb4b9d5a14873cf9a294
parent1a017e4973e75d75795c4668a353597a2b5b1d92 (diff)
parente97bb3559328683ffc2cc09dbf7826e5eecc48e6 (diff)
Merge branch 'smh-dataloss-cmd' into 'master'
Praefect dataloss subcommand See merge request gitlab-org/gitaly!2057
-rw-r--r--changelogs/unreleased/smh-dataloss-cmd.yml5
-rw-r--r--cmd/praefect/main.go14
-rw-r--r--cmd/praefect/subcmd.go1
-rw-r--r--cmd/praefect/subcmd_dataloss.go111
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go141
-rw-r--r--cmd/praefect/subcmd_reconcile.go22
-rw-r--r--internal/praefect/dataloss_check_test.go124
-rw-r--r--internal/praefect/helper_test.go14
-rw-r--r--internal/praefect/server_test.go12
-rw-r--r--internal/praefect/service/info/dataloss.go28
-rw-r--r--internal/praefect/service/info/server.go2
11 files changed, 452 insertions, 22 deletions
diff --git a/changelogs/unreleased/smh-dataloss-cmd.yml b/changelogs/unreleased/smh-dataloss-cmd.yml
new file mode 100644
index 000000000..1e81d2b83
--- /dev/null
+++ b/changelogs/unreleased/smh-dataloss-cmd.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect dataloss subcommand
+merge_request: 2057
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 53561b9db..35a17a6ad 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -53,6 +53,20 @@
// reference storage is omitted, Praefect will perform the check against the
// current primary. If the primary is the same as the target, an error will
// occur.
+//
+// Dataloss
+//
+// The subcommand "dataloss" helps identify dataloss cases during a given
+// timeframe by checking for dead replication jobs. This can be useful to
+// quantify the impact of a primary node failure.
+//
+// praefect -config PATH_TO_CONFIG dataloss -from RFC3339_TIME -to RFC3339_TIME
+//
+// "-from" specifies the inclusive beginning of a timerange to check.
+//
+// "-to" specifies the exclusive ending of a timerange to check.
+//
+// If a timerange is not specified, dead jobs from last six hours are fetched by default.
package main
import (
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 6d8261133..4fda868ac 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -30,6 +30,7 @@ var (
"reconcile": &reconcileSubcommand{},
"sql-migrate-down": &sqlMigrateDownSubcommand{},
"sql-migrate-status": &sqlMigrateStatusSubcommand{},
+ "dataloss": newDatalossSubcommand(),
}
)
diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go
new file mode 100644
index 000000000..7785d4f95
--- /dev/null
+++ b/cmd/praefect/subcmd_dataloss.go
@@ -0,0 +1,111 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "os"
+ "sort"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+var errFromNotBeforeTo = errors.New("'from' must be a time before 'to'")
+
+type timeFlag time.Time
+
+func (tf *timeFlag) String() string {
+ return time.Time(*tf).Format(time.RFC3339)
+}
+
+func (tf *timeFlag) Set(v string) error {
+ t, err := time.Parse(time.RFC3339, v)
+ *tf = timeFlag(t)
+ return err
+}
+
+type datalossSubcommand struct {
+ output io.Writer
+ from time.Time
+ to time.Time
+}
+
+func newDatalossSubcommand() *datalossSubcommand {
+ now := time.Now()
+ return &datalossSubcommand{
+ output: os.Stdout,
+ from: now.Add(-6 * time.Hour),
+ to: now,
+ }
+}
+
+func (cmd *datalossSubcommand) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet("dataloss", flag.ContinueOnError)
+ fs.Var((*timeFlag)(&cmd.from), "from", "inclusive beginning of timerange")
+ fs.Var((*timeFlag)(&cmd.to), "to", "exclusive ending of timerange")
+ return fs
+}
+
+func (cmd *datalossSubcommand) Exec(_ *flag.FlagSet, cfg config.Config) error {
+ nodeAddr, err := getNodeAddress(cfg)
+ if err != nil {
+ return err
+ }
+
+ if !cmd.from.Before(cmd.to) {
+ return errFromNotBeforeTo
+ }
+
+ pbFrom, err := ptypes.TimestampProto(cmd.from)
+ if err != nil {
+ return fmt.Errorf("invalid 'from': %v", err)
+ }
+
+ pbTo, err := ptypes.TimestampProto(cmd.to)
+ if err != nil {
+ return fmt.Errorf("invalid 'to': %v", err)
+ }
+
+ conn, err := subCmdDial(nodeAddr, cfg.Auth.Token)
+ if err != nil {
+ return fmt.Errorf("error dialing: %v", err)
+ }
+ defer func() {
+ if err := conn.Close(); err != nil {
+ log.Printf("error closing connection: %v", err)
+ }
+ }()
+
+ client := gitalypb.NewPraefectInfoServiceClient(conn)
+ resp, err := client.DatalossCheck(context.Background(), &gitalypb.DatalossCheckRequest{
+ From: pbFrom,
+ To: pbTo,
+ })
+ if err != nil {
+ return fmt.Errorf("error checking: %v", err)
+ }
+
+ keys := make([]string, 0, len(resp.ByRelativePath))
+ for k := range resp.ByRelativePath {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+
+ if _, err := fmt.Fprintf(cmd.output, "Failed replication jobs between [%s, %s):\n", cmd.from, cmd.to); err != nil {
+ return fmt.Errorf("error writing output: %v", err)
+ }
+
+ for _, proj := range keys {
+ if _, err := fmt.Fprintf(cmd.output, "%s: %d jobs\n", proj, resp.ByRelativePath[proj]); err != nil {
+ return fmt.Errorf("error writing output: %v", err)
+ }
+ }
+
+ return nil
+}
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
new file mode 100644
index 000000000..79a811390
--- /dev/null
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -0,0 +1,141 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "flag"
+ "fmt"
+ "net"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/timestamp"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+func TestTimeFlag(t *testing.T) {
+ for _, tc := range []struct {
+ input string
+ expected time.Time
+ }{
+ {
+ input: "2020-02-03T14:15:16Z",
+ expected: time.Date(2020, 2, 3, 14, 15, 16, 0, time.UTC),
+ },
+ {
+ input: "2020-02-03T14:15:16+02:00",
+ expected: time.Date(2020, 2, 3, 14, 15, 16, 0, time.FixedZone("UTC+2", 2*60*60)),
+ },
+ {
+ input: "",
+ },
+ } {
+ t.Run(tc.input, func(t *testing.T) {
+ var actual time.Time
+ fs := flag.NewFlagSet("dataloss", flag.ContinueOnError)
+ fs.Var((*timeFlag)(&actual), "time", "")
+
+ err := fs.Parse([]string{"-time", tc.input})
+ if !tc.expected.IsZero() {
+ require.NoError(t, err)
+ }
+
+ require.True(t, tc.expected.Equal(actual))
+ })
+ }
+}
+
+type mockPraefectInfoService struct {
+ gitalypb.UnimplementedPraefectInfoServiceServer
+ DatalossCheckFunc func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error)
+}
+
+func (m mockPraefectInfoService) DatalossCheck(ctx context.Context, r *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
+ return m.DatalossCheckFunc(ctx, r)
+}
+
+func TestDatalossSubcommand(t *testing.T) {
+ tmp, clean := testhelper.TempDir(t, "")
+ defer clean()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock"))
+ require.NoError(t, err)
+ defer ln.Close()
+
+ mockSvc := &mockPraefectInfoService{}
+ srv := grpc.NewServer()
+ gitalypb.RegisterPraefectInfoServiceServer(srv, mockSvc)
+ go func() { require.NoError(t, srv.Serve(ln)) }()
+ defer srv.Stop()
+
+ // verify the mock service is up
+ addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr())
+ cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure())
+ require.NoError(t, err)
+ defer cc.Close()
+
+ for _, tc := range []struct {
+ desc string
+ args []string
+ datalossCheck func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error)
+ output string
+ error error
+ }{
+ {
+ desc: "from equals to",
+ args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-02T00:00:00Z"},
+ error: errFromNotBeforeTo,
+ },
+ {
+ desc: "from after to",
+ args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-01T00:00:00Z"},
+ error: errFromNotBeforeTo,
+ },
+ {
+ desc: "no dead jobs",
+ args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-03T00:00:00Z"},
+ datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
+ require.Equal(t, req.GetFrom(), &timestamp.Timestamp{Seconds: 1577923200})
+ require.Equal(t, req.GetTo(), &timestamp.Timestamp{Seconds: 1578009600})
+ return &gitalypb.DatalossCheckResponse{}, nil
+ },
+ output: "Failed replication jobs between [2020-01-02 00:00:00 +0000 UTC, 2020-01-03 00:00:00 +0000 UTC):\n",
+ },
+ {
+ desc: "success",
+ args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-03T00:00:00Z"},
+ datalossCheck: func(_ context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
+ require.Equal(t, req.GetFrom(), &timestamp.Timestamp{Seconds: 1577923200})
+ require.Equal(t, req.GetTo(), &timestamp.Timestamp{Seconds: 1578009600})
+ return &gitalypb.DatalossCheckResponse{ByRelativePath: map[string]int64{
+ "test-repo/relative-path/2": 4,
+ "test-repo/relative-path/1": 1,
+ "test-repo/relative-path/3": 2,
+ }}, nil
+ },
+ output: `Failed replication jobs between [2020-01-02 00:00:00 +0000 UTC, 2020-01-03 00:00:00 +0000 UTC):
+test-repo/relative-path/1: 1 jobs
+test-repo/relative-path/2: 4 jobs
+test-repo/relative-path/3: 2 jobs
+`,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ mockSvc.DatalossCheckFunc = tc.datalossCheck
+ cmd := newDatalossSubcommand()
+ output := &bytes.Buffer{}
+ cmd.output = output
+ require.NoError(t, cmd.FlagSet().Parse(tc.args))
+ require.Equal(t, tc.error, cmd.Exec(cmd.FlagSet(), config.Config{SocketPath: ln.Addr().String()}))
+ require.Equal(t, tc.output, output.String())
+ })
+ }
+}
diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go
index bb2b12b6c..42f64d6b4 100644
--- a/cmd/praefect/subcmd_reconcile.go
+++ b/cmd/praefect/subcmd_reconcile.go
@@ -48,19 +48,25 @@ func (s *reconcileSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro
return nil
}
+func getNodeAddress(cfg config.Config) (string, error) {
+ switch {
+ case cfg.SocketPath != "":
+ return "unix://" + cfg.SocketPath, nil
+ case cfg.ListenAddr != "":
+ return "tcp://" + cfg.ListenAddr, nil
+ default:
+ return "", errors.New("no Praefect address configured")
+ }
+}
+
func (nr nodeReconciler) reconcile() error {
if err := nr.validateArgs(); err != nil {
return err
}
- var nodeAddr string
- switch {
- case nr.conf.SocketPath != "":
- nodeAddr = "unix://" + nr.conf.SocketPath
- case nr.conf.ListenAddr != "":
- nodeAddr = "tcp://" + nr.conf.ListenAddr
- default:
- return errors.New("no Praefect address configured")
+ nodeAddr, err := getNodeAddress(nr.conf)
+ if err != nil {
+ return err
}
cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token)
diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go
new file mode 100644
index 000000000..7b86dd362
--- /dev/null
+++ b/internal/praefect/dataloss_check_test.go
@@ -0,0 +1,124 @@
+package praefect
+
+import (
+ "testing"
+
+ "github.com/golang/protobuf/ptypes"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestDatalossCheck(t *testing.T) {
+ cfg := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*models.Node{
+ {
+ DefaultPrimary: true,
+ Storage: "not-needed",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
+ },
+ },
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ rq := datastore.NewMemoryReplicationEventQueue()
+ const targetNode = "test-node"
+ killJobs := func(t *testing.T) {
+ t.Helper()
+ for {
+ jobs, err := rq.Dequeue(ctx, targetNode, 1)
+ require.NoError(t, err)
+ if len(jobs) == 0 {
+ // all jobs dead
+ break
+ }
+
+ state := datastore.JobStateFailed
+ if jobs[0].Attempt == 0 {
+ state = datastore.JobStateDead
+ }
+
+ _, err = rq.Acknowledge(ctx, state, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+ }
+ }
+
+ beforeTimerange, err := rq.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: "repo/before-timerange",
+ },
+ })
+ require.NoError(t, err)
+ expectedDeadJobs := map[string]int64{"repo/dead-job": 1, "repo/multiple-dead-jobs": 2}
+ for relPath, count := range expectedDeadJobs {
+ for i := int64(0); i < count; i++ {
+ _, err := rq.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: relPath,
+ TargetNodeStorage: targetNode,
+ },
+ })
+ require.NoError(t, err)
+ }
+ }
+ killJobs(t)
+
+ // add some non-dead jobs
+ for relPath, state := range map[string]datastore.JobState{
+ "repo/completed-job": datastore.JobStateCompleted,
+ "repo/cancelled-job": datastore.JobStateCancelled,
+ } {
+ _, err := rq.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: relPath,
+ TargetNodeStorage: targetNode,
+ },
+ })
+ require.NoError(t, err)
+
+ jobs, err := rq.Dequeue(ctx, targetNode, 1)
+ require.NoError(t, err)
+
+ _, err = rq.Acknowledge(ctx, state, []uint64{jobs[0].ID})
+ require.NoError(t, err)
+ }
+
+ afterTimerange, err := rq.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: "repo/after-timerange",
+ },
+ })
+ require.NoError(t, err)
+ killJobs(t)
+
+ cc, _, clean := runPraefectServerWithMock(t, cfg,
+ datastore.Datastore{ReplicationEventQueue: rq},
+ map[string]mock.SimpleServiceServer{
+ "not-needed": &mock.UnimplementedSimpleServiceServer{},
+ },
+ )
+ defer clean()
+
+ pbFrom, err := ptypes.TimestampProto(beforeTimerange.CreatedAt)
+ require.NoError(t, err)
+ pbTo, err := ptypes.TimestampProto(afterTimerange.CreatedAt)
+ require.NoError(t, err)
+
+ resp, err := gitalypb.NewPraefectInfoServiceClient(cc).DatalossCheck(ctx, &gitalypb.DatalossCheckRequest{
+ From: pbFrom,
+ To: pbTo,
+ })
+ require.NoError(t, err)
+ require.Equal(t, &gitalypb.DatalossCheckResponse{ByRelativePath: expectedDeadJobs}, resp)
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 28fe74ce2..e0275c856 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -74,11 +74,7 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
-func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server {
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
- }
+func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, ds datastore.Datastore, l *logrus.Entry, r *protoregistry.Registry) *Server {
coordinator := NewCoordinator(l, ds, nodeMgr, conf, r)
var defaultNode *models.Node
@@ -99,7 +95,7 @@ func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *log
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
// requires there to be only 1 virtual storage
-func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
+func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
require.Len(t, conf.VirtualStorages, 1)
require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes),
"mock server count doesn't match config nodes")
@@ -124,14 +120,14 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
r := protoregistry.New()
require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t)))
- prf := setupServer(t, conf, nodeMgr, log.Default(), r)
+ prf := setupServer(t, conf, nodeMgr, ds, log.Default(), r)
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
errQ := make(chan error)
- prf.RegisterServices(nodeMgr, conf, datastore.Datastore{})
+ prf.RegisterServices(nodeMgr, conf, ds)
go func() {
errQ <- prf.Serve(listener, false)
}()
@@ -152,7 +148,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
require.NoError(t, prf.Shutdown(ctx))
}
- return mock.NewSimpleServiceClient(cc), prf, cleanup
+ return cc, prf, cleanup
}
func noopBackoffFunc() (backoff, backoffReset) {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 62f095c3d..865ec286a 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -51,9 +51,11 @@ func TestServerRouteServerAccessor(t *testing.T) {
}
)
- cli, _, cleanup := runPraefectServerWithMock(t, conf, backends)
+ cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends)
defer cleanup()
+ cli := mock.NewSimpleServiceClient(cc)
+
expectReq := &mock.SimpleRequest{Value: 1}
done := make(chan struct{})
@@ -141,7 +143,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
registry := protoregistry.New()
require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- srv := setupServer(t, conf, nodeMgr, entry, registry)
+ srv := setupServer(t, conf, nodeMgr, datastore.Datastore{}, entry, registry)
listener, port := listenAvailPort(t)
go func() {
@@ -276,7 +278,7 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook := test.NewNullLogger()
- setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+ setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
for _, entry := range hook.Entries {
require.NotContains(t, entry.Message, "more than one backend node")
@@ -303,7 +305,7 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook = test.NewNullLogger()
- setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+ setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
var found bool
for _, entry := range hook.Entries {
@@ -349,7 +351,7 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook = test.NewNullLogger()
- setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+ setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
for _, entry := range hook.Entries {
require.NotContains(t, entry.Message, "more than one backend node")
diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go
new file mode 100644
index 000000000..9e863d2b2
--- /dev/null
+++ b/internal/praefect/service/info/dataloss.go
@@ -0,0 +1,28 @@
+package info
+
+import (
+ "context"
+
+ "github.com/golang/protobuf/ptypes"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
+ from, err := ptypes.Timestamp(req.GetFrom())
+ if err != nil {
+ return nil, helper.ErrInvalidArgumentf("invalid 'from': %v", err)
+ }
+
+ to, err := ptypes.Timestamp(req.GetTo())
+ if err != nil {
+ return nil, helper.ErrInvalidArgumentf("invalid 'to': %v", err)
+ }
+
+ dead, err := s.queue.CountDeadReplicationJobs(ctx, from, to)
+ if err != nil {
+ return nil, err
+ }
+
+ return &gitalypb.DatalossCheckResponse{ByRelativePath: dead}, nil
+}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 46702d80a..bbd728152 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -2,6 +2,7 @@ package info
import (
"context"
+ "time"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -12,6 +13,7 @@ import (
// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service
type Queue interface {
Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error)
+ CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
}
// compile time assertion that Queue is satisfied by