diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-15 14:32:03 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-15 14:32:03 +0300 |
commit | f882c0545f51fb2d0fae3187eac0f21472c8ef5d (patch) | |
tree | cb120e1336b4d2e89d4ceb4b9d5a14873cf9a294 | |
parent | 1a017e4973e75d75795c4668a353597a2b5b1d92 (diff) | |
parent | e97bb3559328683ffc2cc09dbf7826e5eecc48e6 (diff) |
Merge branch 'smh-dataloss-cmd' into 'master'
Praefect dataloss subcommand
See merge request gitlab-org/gitaly!2057
-rw-r--r-- | changelogs/unreleased/smh-dataloss-cmd.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 14 | ||||
-rw-r--r-- | cmd/praefect/subcmd.go | 1 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss.go | 111 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss_test.go | 141 | ||||
-rw-r--r-- | cmd/praefect/subcmd_reconcile.go | 22 | ||||
-rw-r--r-- | internal/praefect/dataloss_check_test.go | 124 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 14 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/service/info/dataloss.go | 28 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 2 |
11 files changed, 452 insertions, 22 deletions
diff --git a/changelogs/unreleased/smh-dataloss-cmd.yml b/changelogs/unreleased/smh-dataloss-cmd.yml new file mode 100644 index 000000000..1e81d2b83 --- /dev/null +++ b/changelogs/unreleased/smh-dataloss-cmd.yml @@ -0,0 +1,5 @@ +--- +title: Praefect dataloss subcommand +merge_request: 2057 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 53561b9db..35a17a6ad 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -53,6 +53,20 @@ // reference storage is omitted, Praefect will perform the check against the // current primary. If the primary is the same as the target, an error will // occur. +// +// Dataloss +// +// The subcommand "dataloss" helps identify dataloss cases during a given +// timeframe by checking for dead replication jobs. This can be useful to +// quantify the impact of a primary node failure. +// +// praefect -config PATH_TO_CONFIG dataloss -from RFC3339_TIME -to RFC3339_TIME +// +// "-from" specifies the inclusive beginning of a timerange to check. +// +// "-to" specifies the exclusive ending of a timerange to check. +// +// If a timerange is not specified, dead jobs from last six hours are fetched by default. package main import ( diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 6d8261133..4fda868ac 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -30,6 +30,7 @@ var ( "reconcile": &reconcileSubcommand{}, "sql-migrate-down": &sqlMigrateDownSubcommand{}, "sql-migrate-status": &sqlMigrateStatusSubcommand{}, + "dataloss": newDatalossSubcommand(), } ) diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go new file mode 100644 index 000000000..7785d4f95 --- /dev/null +++ b/cmd/praefect/subcmd_dataloss.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "log" + "os" + "sort" + "time" + + "github.com/golang/protobuf/ptypes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +var errFromNotBeforeTo = errors.New("'from' must be a time before 'to'") + +type timeFlag time.Time + +func (tf *timeFlag) String() string { + return time.Time(*tf).Format(time.RFC3339) +} + +func (tf *timeFlag) Set(v string) error { + t, err := time.Parse(time.RFC3339, v) + *tf = timeFlag(t) + return err +} + +type datalossSubcommand struct { + output io.Writer + from time.Time + to time.Time +} + +func newDatalossSubcommand() *datalossSubcommand { + now := time.Now() + return &datalossSubcommand{ + output: os.Stdout, + from: now.Add(-6 * time.Hour), + to: now, + } +} + +func (cmd *datalossSubcommand) FlagSet() *flag.FlagSet { + fs := flag.NewFlagSet("dataloss", flag.ContinueOnError) + fs.Var((*timeFlag)(&cmd.from), "from", "inclusive beginning of timerange") + fs.Var((*timeFlag)(&cmd.to), "to", "exclusive ending of timerange") + return fs +} + +func (cmd *datalossSubcommand) Exec(_ *flag.FlagSet, cfg config.Config) error { + nodeAddr, err := getNodeAddress(cfg) + if err != nil { + return err + } + + if !cmd.from.Before(cmd.to) { + return errFromNotBeforeTo + } + + pbFrom, err := ptypes.TimestampProto(cmd.from) + if err != nil { + return fmt.Errorf("invalid 'from': %v", err) + } + + pbTo, err := ptypes.TimestampProto(cmd.to) + if err != nil { + return fmt.Errorf("invalid 'to': %v", err) + } + + conn, err := subCmdDial(nodeAddr, cfg.Auth.Token) + if err != nil { + return fmt.Errorf("error dialing: %v", err) + } + defer func() { + if err := conn.Close(); err != nil { + log.Printf("error closing connection: %v", err) + } + }() + + client := gitalypb.NewPraefectInfoServiceClient(conn) + resp, err := client.DatalossCheck(context.Background(), &gitalypb.DatalossCheckRequest{ + From: pbFrom, + To: pbTo, + }) + if err != nil { + return fmt.Errorf("error checking: %v", err) + } + + keys := make([]string, 0, len(resp.ByRelativePath)) + for k := range resp.ByRelativePath { + keys = append(keys, k) + } + sort.Strings(keys) + + if _, err := fmt.Fprintf(cmd.output, "Failed replication jobs between [%s, %s):\n", cmd.from, cmd.to); err != nil { + return fmt.Errorf("error writing output: %v", err) + } + + for _, proj := range keys { + if _, err := fmt.Fprintf(cmd.output, "%s: %d jobs\n", proj, resp.ByRelativePath[proj]); err != nil { + return fmt.Errorf("error writing output: %v", err) + } + } + + return nil +} diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go new file mode 100644 index 000000000..79a811390 --- /dev/null +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -0,0 +1,141 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "net" + "path/filepath" + "testing" + "time" + + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func TestTimeFlag(t *testing.T) { + for _, tc := range []struct { + input string + expected time.Time + }{ + { + input: "2020-02-03T14:15:16Z", + expected: time.Date(2020, 2, 3, 14, 15, 16, 0, time.UTC), + }, + { + input: "2020-02-03T14:15:16+02:00", + expected: time.Date(2020, 2, 3, 14, 15, 16, 0, time.FixedZone("UTC+2", 2*60*60)), + }, + { + input: "", + }, + } { + t.Run(tc.input, func(t *testing.T) { + var actual time.Time + fs := flag.NewFlagSet("dataloss", flag.ContinueOnError) + fs.Var((*timeFlag)(&actual), "time", "") + + err := fs.Parse([]string{"-time", tc.input}) + if !tc.expected.IsZero() { + require.NoError(t, err) + } + + require.True(t, tc.expected.Equal(actual)) + }) + } +} + +type mockPraefectInfoService struct { + gitalypb.UnimplementedPraefectInfoServiceServer + DatalossCheckFunc func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) +} + +func (m mockPraefectInfoService) DatalossCheck(ctx context.Context, r *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { + return m.DatalossCheckFunc(ctx, r) +} + +func TestDatalossSubcommand(t *testing.T) { + tmp, clean := testhelper.TempDir(t, "") + defer clean() + + ctx, cancel := testhelper.Context() + defer cancel() + + ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock")) + require.NoError(t, err) + defer ln.Close() + + mockSvc := &mockPraefectInfoService{} + srv := grpc.NewServer() + gitalypb.RegisterPraefectInfoServiceServer(srv, mockSvc) + go func() { require.NoError(t, srv.Serve(ln)) }() + defer srv.Stop() + + // verify the mock service is up + addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr()) + cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure()) + require.NoError(t, err) + defer cc.Close() + + for _, tc := range []struct { + desc string + args []string + datalossCheck func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) + output string + error error + }{ + { + desc: "from equals to", + args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-02T00:00:00Z"}, + error: errFromNotBeforeTo, + }, + { + desc: "from after to", + args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-01T00:00:00Z"}, + error: errFromNotBeforeTo, + }, + { + desc: "no dead jobs", + args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-03T00:00:00Z"}, + datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { + require.Equal(t, req.GetFrom(), ×tamp.Timestamp{Seconds: 1577923200}) + require.Equal(t, req.GetTo(), ×tamp.Timestamp{Seconds: 1578009600}) + return &gitalypb.DatalossCheckResponse{}, nil + }, + output: "Failed replication jobs between [2020-01-02 00:00:00 +0000 UTC, 2020-01-03 00:00:00 +0000 UTC):\n", + }, + { + desc: "success", + args: []string{"-from=2020-01-02T00:00:00Z", "-to=2020-01-03T00:00:00Z"}, + datalossCheck: func(_ context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { + require.Equal(t, req.GetFrom(), ×tamp.Timestamp{Seconds: 1577923200}) + require.Equal(t, req.GetTo(), ×tamp.Timestamp{Seconds: 1578009600}) + return &gitalypb.DatalossCheckResponse{ByRelativePath: map[string]int64{ + "test-repo/relative-path/2": 4, + "test-repo/relative-path/1": 1, + "test-repo/relative-path/3": 2, + }}, nil + }, + output: `Failed replication jobs between [2020-01-02 00:00:00 +0000 UTC, 2020-01-03 00:00:00 +0000 UTC): +test-repo/relative-path/1: 1 jobs +test-repo/relative-path/2: 4 jobs +test-repo/relative-path/3: 2 jobs +`, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + mockSvc.DatalossCheckFunc = tc.datalossCheck + cmd := newDatalossSubcommand() + output := &bytes.Buffer{} + cmd.output = output + require.NoError(t, cmd.FlagSet().Parse(tc.args)) + require.Equal(t, tc.error, cmd.Exec(cmd.FlagSet(), config.Config{SocketPath: ln.Addr().String()})) + require.Equal(t, tc.output, output.String()) + }) + } +} diff --git a/cmd/praefect/subcmd_reconcile.go b/cmd/praefect/subcmd_reconcile.go index bb2b12b6c..42f64d6b4 100644 --- a/cmd/praefect/subcmd_reconcile.go +++ b/cmd/praefect/subcmd_reconcile.go @@ -48,19 +48,25 @@ func (s *reconcileSubcommand) Exec(flags *flag.FlagSet, conf config.Config) erro return nil } +func getNodeAddress(cfg config.Config) (string, error) { + switch { + case cfg.SocketPath != "": + return "unix://" + cfg.SocketPath, nil + case cfg.ListenAddr != "": + return "tcp://" + cfg.ListenAddr, nil + default: + return "", errors.New("no Praefect address configured") + } +} + func (nr nodeReconciler) reconcile() error { if err := nr.validateArgs(); err != nil { return err } - var nodeAddr string - switch { - case nr.conf.SocketPath != "": - nodeAddr = "unix://" + nr.conf.SocketPath - case nr.conf.ListenAddr != "": - nodeAddr = "tcp://" + nr.conf.ListenAddr - default: - return errors.New("no Praefect address configured") + nodeAddr, err := getNodeAddress(nr.conf) + if err != nil { + return err } cc, err := subCmdDial(nodeAddr, nr.conf.Auth.Token) diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go new file mode 100644 index 000000000..7b86dd362 --- /dev/null +++ b/internal/praefect/dataloss_check_test.go @@ -0,0 +1,124 @@ +package praefect + +import ( + "testing" + + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestDatalossCheck(t *testing.T) { + cfg := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "praefect", + Nodes: []*models.Node{ + { + DefaultPrimary: true, + Storage: "not-needed", + Address: "tcp::/this-doesnt-matter", + }, + }, + }, + }, + } + + ctx, cancel := testhelper.Context() + defer cancel() + + rq := datastore.NewMemoryReplicationEventQueue() + const targetNode = "test-node" + killJobs := func(t *testing.T) { + t.Helper() + for { + jobs, err := rq.Dequeue(ctx, targetNode, 1) + require.NoError(t, err) + if len(jobs) == 0 { + // all jobs dead + break + } + + state := datastore.JobStateFailed + if jobs[0].Attempt == 0 { + state = datastore.JobStateDead + } + + _, err = rq.Acknowledge(ctx, state, []uint64{jobs[0].ID}) + require.NoError(t, err) + } + } + + beforeTimerange, err := rq.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: "repo/before-timerange", + }, + }) + require.NoError(t, err) + expectedDeadJobs := map[string]int64{"repo/dead-job": 1, "repo/multiple-dead-jobs": 2} + for relPath, count := range expectedDeadJobs { + for i := int64(0); i < count; i++ { + _, err := rq.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: relPath, + TargetNodeStorage: targetNode, + }, + }) + require.NoError(t, err) + } + } + killJobs(t) + + // add some non-dead jobs + for relPath, state := range map[string]datastore.JobState{ + "repo/completed-job": datastore.JobStateCompleted, + "repo/cancelled-job": datastore.JobStateCancelled, + } { + _, err := rq.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: relPath, + TargetNodeStorage: targetNode, + }, + }) + require.NoError(t, err) + + jobs, err := rq.Dequeue(ctx, targetNode, 1) + require.NoError(t, err) + + _, err = rq.Acknowledge(ctx, state, []uint64{jobs[0].ID}) + require.NoError(t, err) + } + + afterTimerange, err := rq.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: "repo/after-timerange", + }, + }) + require.NoError(t, err) + killJobs(t) + + cc, _, clean := runPraefectServerWithMock(t, cfg, + datastore.Datastore{ReplicationEventQueue: rq}, + map[string]mock.SimpleServiceServer{ + "not-needed": &mock.UnimplementedSimpleServiceServer{}, + }, + ) + defer clean() + + pbFrom, err := ptypes.TimestampProto(beforeTimerange.CreatedAt) + require.NoError(t, err) + pbTo, err := ptypes.TimestampProto(afterTimerange.CreatedAt) + require.NoError(t, err) + + resp, err := gitalypb.NewPraefectInfoServiceClient(cc).DatalossCheck(ctx, &gitalypb.DatalossCheckRequest{ + From: pbFrom, + To: pbTo, + }) + require.NoError(t, err) + require.Equal(t, &gitalypb.DatalossCheckResponse{ByRelativePath: expectedDeadJobs}, resp) +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 28fe74ce2..e0275c856 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -74,11 +74,7 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection -func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server { - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), - } +func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, ds datastore.Datastore, l *logrus.Entry, r *protoregistry.Registry) *Server { coordinator := NewCoordinator(l, ds, nodeMgr, conf, r) var defaultNode *models.Node @@ -99,7 +95,7 @@ func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *log // config.Nodes. There must be a 1-to-1 mapping between backend server and // configured storage node. // requires there to be only 1 virtual storage -func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { +func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { require.Len(t, conf.VirtualStorages, 1) require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes), "mock server count doesn't match config nodes") @@ -124,14 +120,14 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st r := protoregistry.New() require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) - prf := setupServer(t, conf, nodeMgr, log.Default(), r) + prf := setupServer(t, conf, nodeMgr, ds, log.Default(), r) listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) errQ := make(chan error) - prf.RegisterServices(nodeMgr, conf, datastore.Datastore{}) + prf.RegisterServices(nodeMgr, conf, ds) go func() { errQ <- prf.Serve(listener, false) }() @@ -152,7 +148,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st require.NoError(t, prf.Shutdown(ctx)) } - return mock.NewSimpleServiceClient(cc), prf, cleanup + return cc, prf, cleanup } func noopBackoffFunc() (backoff, backoffReset) { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 62f095c3d..865ec286a 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -51,9 +51,11 @@ func TestServerRouteServerAccessor(t *testing.T) { } ) - cli, _, cleanup := runPraefectServerWithMock(t, conf, backends) + cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends) defer cleanup() + cli := mock.NewSimpleServiceClient(cc) + expectReq := &mock.SimpleRequest{Value: 1} done := make(chan struct{}) @@ -141,7 +143,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { registry := protoregistry.New() require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - srv := setupServer(t, conf, nodeMgr, entry, registry) + srv := setupServer(t, conf, nodeMgr, datastore.Datastore{}, entry, registry) listener, port := listenAvailPort(t) go func() { @@ -276,7 +278,7 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook := test.NewNullLogger() - setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning for _, entry := range hook.Entries { require.NotContains(t, entry.Message, "more than one backend node") @@ -303,7 +305,7 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() - setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning var found bool for _, entry := range hook.Entries { @@ -349,7 +351,7 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() - setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + setupServer(t, conf, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning for _, entry := range hook.Entries { require.NotContains(t, entry.Message, "more than one backend node") diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go new file mode 100644 index 000000000..9e863d2b2 --- /dev/null +++ b/internal/praefect/service/info/dataloss.go @@ -0,0 +1,28 @@ +package info + +import ( + "context" + + "github.com/golang/protobuf/ptypes" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { + from, err := ptypes.Timestamp(req.GetFrom()) + if err != nil { + return nil, helper.ErrInvalidArgumentf("invalid 'from': %v", err) + } + + to, err := ptypes.Timestamp(req.GetTo()) + if err != nil { + return nil, helper.ErrInvalidArgumentf("invalid 'to': %v", err) + } + + dead, err := s.queue.CountDeadReplicationJobs(ctx, from, to) + if err != nil { + return nil, err + } + + return &gitalypb.DatalossCheckResponse{ByRelativePath: dead}, nil +} diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 46702d80a..bbd728152 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -2,6 +2,7 @@ package info import ( "context" + "time" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -12,6 +13,7 @@ import ( // Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service type Queue interface { Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error) + CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) } // compile time assertion that Queue is satisfied by |