diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-07 17:42:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-13 19:19:01 +0300 |
commit | f6fe5f5ec213c8110ac18fc9a5d926f58dd04dd9 (patch) | |
tree | ec4ae874c9398439beabc1ea8c53702fdbb6c8e1 | |
parent | a863286e59c23b7ec374afca92fa25a454b40f29 (diff) |
repository generationssmh-dataloss-generations
29 files changed, 1009 insertions, 351 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index d52a4c308..9bd2909f3 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -230,10 +230,13 @@ func run(cfgs []starter.Config, conf config.Config) error { } var queue datastore.ReplicationEventQueue + var gs datastore.GenerationStore if conf.MemoryQueueEnabled { queue = datastore.NewMemoryReplicationEventQueue(conf) + gs = datastore.NewLocalGenerationStore(conf.StorageNames()) } else { queue = datastore.NewPostgresReplicationEventQueue(db) + gs = datastore.NewPostgresGenerationStore(db, conf.StorageNames()) } nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram) @@ -259,11 +262,19 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies - coordinator = praefect.NewCoordinator(queue, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) - repl = praefect.NewReplMgr( + coordinator = praefect.NewCoordinator( + queue, + gs, + nodeManager, + transactionManager, + conf, + protoregistry.GitalyProtoPreregistered, + ) + repl = praefect.NewReplMgr( logger, conf.VirtualStorageNames(), queue, + gs, nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), @@ -276,6 +287,7 @@ func run(cfgs []starter.Config, conf config.Config) error { nodeManager, transactionManager, queue, + gs, protoregistry.GitalyProtoPreregistered, ) ) diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go index f9c7f2d22..a8c57c465 100644 --- a/cmd/praefect/subcmd_dataloss.go +++ b/cmd/praefect/subcmd_dataloss.go @@ -74,27 +74,24 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro return fmt.Errorf("error checking: %v", err) } - mode := "write-enabled" - if resp.IsReadOnly { - mode = "read-only" - } - cmd.println(0, "Virtual storage: %s", vs) - cmd.println(1, "Current %s primary: %s", mode, resp.CurrentPrimary) - if resp.PreviousWritablePrimary == "" { - fmt.Fprintln(cmd.output, " No data loss as the virtual storage has not encountered a failover") - continue - } - - cmd.println(1, "Previous write-enabled primary: %s", resp.PreviousWritablePrimary) - if len(resp.OutdatedNodes) == 0 { - cmd.println(2, "No data loss from failing over from %s", resp.PreviousWritablePrimary) - continue + cmd.println(1, "Primary: %s", resp.Primary) + if len(resp.Repositories) == 0 { + cmd.println(1, "All repositories are consistent!") + return nil } - cmd.println(2, "Nodes with data loss from failing over from %s:", resp.PreviousWritablePrimary) - for _, odn := range resp.OutdatedNodes { - cmd.println(3, "%s: %s", odn.RelativePath, strings.Join(odn.Nodes, ", ")) + cmd.println(1, "Outdated repositories:") + for _, r := range resp.Repositories { + cmd.println(2, "%s:", r.RelativePath) + for _, s := range r.Storages { + plural := "" + if s.BehindBy > 1 { + plural = "s" + } + + cmd.println(3, "%s is behind by %d generation%s or less", s.Name, s.BehindBy, plural) + } } } diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 569cf7caa..46e201c43 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -5,9 +5,12 @@ import ( "context" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -26,107 +29,81 @@ func (m mockPraefectInfoService) EnableWrites(ctx context.Context, r *gitalypb.E } func TestDatalossSubcommand(t *testing.T) { - mockSvc := &mockPraefectInfoService{} - ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)}) + mgr := &nodes.MockManager{ + GetShardFunc: func(vs string) (nodes.Shard, error) { + var primary string + switch vs { + case "virtual-storage-1": + primary = "gitaly-1" + case "virtual-storage-2": + primary = "gitaly-4" + default: + t.Error("unexpected virtual storage") + } + + return nodes.Shard{Primary: &nodes.MockNode{StorageName: primary}}, nil + }, + } + + gs := datastore.NewLocalGenerationStore(map[string][]string{ + "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"}, + "virtual-storage-2": {"gitaly-4"}, + }) + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-1", "repository-1", 1)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-2", "repository-1", 0)) + + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-2", "repository-2", 0)) + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-3", "repository-2", 0)) + + ln, clean := listenAndServe(t, []svcRegistrar{ + registerPraefectInfoServer(info.NewServer(mgr, config.Config{}, nil, gs))}) defer clean() for _, tc := range []struct { desc string args []string virtualStorages []*config.VirtualStorage - datalossCheck func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) output string error error }{ { desc: "positional arguments", - args: []string{"-virtual-storage=test-virtual-storage", "positional-arg"}, + args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"}, error: UnexpectedPositionalArgsError{Command: "dataloss"}, }, { - desc: "no failover", - args: []string{"-virtual-storage=test-virtual-storage"}, - datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage()) - return &gitalypb.DatalossCheckResponse{ - CurrentPrimary: "test-current-primary", - }, nil - }, - output: `Virtual storage: test-virtual-storage - Current write-enabled primary: test-current-primary - No data loss as the virtual storage has not encountered a failover -`, - }, - { - desc: "no data loss", - args: []string{"-virtual-storage=test-virtual-storage"}, - datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage()) - return &gitalypb.DatalossCheckResponse{ - PreviousWritablePrimary: "test-previous-primary", - IsReadOnly: false, - CurrentPrimary: "test-current-primary", - }, nil - }, - output: `Virtual storage: test-virtual-storage - Current write-enabled primary: test-current-primary - Previous write-enabled primary: test-previous-primary - No data loss from failing over from test-previous-primary -`, - }, - { desc: "data loss", - args: []string{"-virtual-storage=test-virtual-storage"}, - datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage()) - return &gitalypb.DatalossCheckResponse{ - PreviousWritablePrimary: "test-previous-primary", - IsReadOnly: true, - CurrentPrimary: "test-current-primary", - OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{ - {RelativePath: "repository-1", Nodes: []string{"gitaly-2", "gitaly-3"}}, - {RelativePath: "repository-2", Nodes: []string{"gitaly-1"}}, - }, - }, nil - }, - output: `Virtual storage: test-virtual-storage - Current read-only primary: test-current-primary - Previous write-enabled primary: test-previous-primary - Nodes with data loss from failing over from test-previous-primary: - repository-1: gitaly-2, gitaly-3 - repository-2: gitaly-1 + args: []string{"-virtual-storage=virtual-storage-1"}, output: `Virtual storage: virtual-storage-1 + Primary: gitaly-1 + Outdated repositories: + repository-1: + gitaly-2 is behind by 1 generation or less + gitaly-3 is behind by 2 generations or less + repository-2: + gitaly-1 is behind by 1 generation or less `, }, { desc: "multiple virtual storages", - virtualStorages: []*config.VirtualStorage{{Name: "test-virtual-storage-2"}, {Name: "test-virtual-storage-1"}}, - datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - return &gitalypb.DatalossCheckResponse{ - PreviousWritablePrimary: "test-previous-primary", - IsReadOnly: true, - CurrentPrimary: "test-current-primary", - OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{ - {RelativePath: "repository-1", Nodes: []string{"gitaly-2", "gitaly-3"}}, - {RelativePath: "repository-2", Nodes: []string{"gitaly-1"}}, - }, - }, nil - }, - output: `Virtual storage: test-virtual-storage-1 - Current read-only primary: test-current-primary - Previous write-enabled primary: test-previous-primary - Nodes with data loss from failing over from test-previous-primary: - repository-1: gitaly-2, gitaly-3 - repository-2: gitaly-1 -Virtual storage: test-virtual-storage-2 - Current read-only primary: test-current-primary - Previous write-enabled primary: test-previous-primary - Nodes with data loss from failing over from test-previous-primary: - repository-1: gitaly-2, gitaly-3 - repository-2: gitaly-1 + virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}}, + output: `Virtual storage: virtual-storage-1 + Primary: gitaly-1 + Outdated repositories: + repository-1: + gitaly-2 is behind by 1 generation or less + gitaly-3 is behind by 2 generations or less + repository-2: + gitaly-1 is behind by 1 generation or less +Virtual storage: virtual-storage-2 + Primary: gitaly-4 + All repositories are consistent! `, }, } { t.Run(tc.desc, func(t *testing.T) { - mockSvc.DatalossCheckFunc = tc.datalossCheck cmd := newDatalossSubcommand() output := &bytes.Buffer{} cmd.output = output diff --git a/cmd/praefect/subcmd_test.go b/cmd/praefect/subcmd_test.go index dc3e40758..38a0a199e 100644 --- a/cmd/praefect/subcmd_test.go +++ b/cmd/praefect/subcmd_test.go @@ -41,7 +41,7 @@ func listenAndServe(t testing.TB, svcs []svcRegistrar) (net.Listener, testhelper tmp, clean := testhelper.TempDir(t) - ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock")) + ln, err := net.Listen("unix", filepath.Join(tmp, "lollero")) require.NoError(t, err) srv := grpc.NewServer() diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index b112a716f..730987869 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -178,9 +178,11 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, registry, err := protoregistry.New(fd) require.NoError(t, err) - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) + gs := datastore.NewLocalGenerationStore(conf.StorageNames()) - srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue) + coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, registry) + + srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 31b1b4d25..4cd10154f 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -172,6 +172,21 @@ func (c *Config) VirtualStorageNames() []string { return names } +// StorageNames returns storage names by virtual storage. +func (c *Config) StorageNames() map[string][]string { + storages := make(map[string][]string, len(c.VirtualStorages)) + for _, vs := range c.VirtualStorages { + nodes := make([]string, len(vs.Nodes)) + for i, n := range vs.Nodes { + nodes[i] = n.Storage + } + + storages[vs.Name] = nodes + } + + return storages +} + // DB holds Postgres client configuration data. type DB struct { Host string `toml:"host"` diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 1ffde42ee..51c2d254c 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -336,6 +336,18 @@ func TestVirtualStorageNames(t *testing.T) { require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames()) } +func TestStorageNames(t *testing.T) { + conf := Config{ + VirtualStorages: []*VirtualStorage{ + {Name: "virtual-storage-1", Nodes: []*Node{{Storage: "gitaly-1"}, {Storage: "gitaly-2"}}}, + {Name: "virtual-storage-2", Nodes: []*Node{{Storage: "gitaly-3"}, {Storage: "gitaly-4"}}}, + }} + require.Equal(t, map[string][]string{ + "virtual-storage-1": {"gitaly-1", "gitaly-2"}, + "virtual-storage-2": {"gitaly-3", "gitaly-4"}, + }, conf.StorageNames()) +} + func TestToPQString(t *testing.T) { testCases := []struct { desc string diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 47a48d4af..cf3b31a2b 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -60,9 +60,8 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m) } return datastore.RepackIncremental, nil, nil - default: - return datastore.UpdateRepo, nil, nil + return datastore.UpdateRepo, datastore.Params{}, nil } } @@ -81,6 +80,7 @@ type Coordinator struct { nodeMgr nodes.Manager txMgr *transactions.Manager queue datastore.ReplicationEventQueue + gs datastore.GenerationStore registry *protoregistry.Registry conf config.Config } @@ -88,6 +88,7 @@ type Coordinator struct { // NewCoordinator returns a new Coordinator that utilizes the provided logger func NewCoordinator( queue datastore.ReplicationEventQueue, + gs datastore.GenerationStore, nodeMgr nodes.Manager, txMgr *transactions.Manager, conf config.Config, @@ -95,6 +96,7 @@ func NewCoordinator( ) *Coordinator { return &Coordinator{ queue: queue, + gs: gs, registry: r, nodeMgr: nodeMgr, txMgr: txMgr, @@ -237,7 +239,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall }) } } else { - finalizers = append(finalizers, c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, shard.Secondaries, change, params)) + finalizers = append(finalizers, c.newRequestFinalizer(ctx, virtualStorage, call.targetRepo, shard.Primary, shard.Secondaries, change, params)) } reqFinalizer := func() error { @@ -363,7 +365,7 @@ func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, err return m, nil } -func (c *Coordinator) createReplicaJobs( +func (c *Coordinator) newRequestFinalizer( ctx context.Context, virtualStorage string, targetRepo *gitalypb.Repository, @@ -373,6 +375,14 @@ func (c *Coordinator) createReplicaJobs( params datastore.Params, ) func() error { return func() error { + if change == datastore.UpdateRepo { + var err error + params["generation"], err = c.gs.IncrementGeneration(ctx, virtualStorage, primary.GetStorage(), targetRepo.GetRelativePath()) + if err != nil { + return fmt.Errorf("failed incrementing primary's generation: %w", err) + } + } + correlationID := c.ensureCorrelationID(ctx, targetRepo) g, ctx := errgroup.WithContext(ctx) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 7b13be6c6..4e841f716 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -87,6 +87,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { const storageName = "test-storage" coordinator := NewCoordinator( datastore.NewMemoryReplicationEventQueue(conf), + nil, &nodes.MockManager{GetShardFunc: func(storage string) (nodes.Shard, error) { return nodes.Shard{ IsReadOnly: tc.readOnly, @@ -160,7 +161,14 @@ func TestStreamDirectorMutator(t *testing.T) { txMgr := transactions.NewManager() - coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator( + queueInterceptor, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, @@ -212,6 +220,9 @@ func TestStreamDirectorMutator(t *testing.T) { RelativePath: targetRepo.RelativePath, TargetNodeStorage: secondaryNode.Storage, SourceNodeStorage: primaryNode.Storage, + Params: datastore.Params{ + "generation": 0, + }, }, Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, } @@ -257,7 +268,14 @@ func TestStreamDirectorAccessor(t *testing.T) { txMgr := transactions.NewManager() - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator( + queue, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) require.NoError(t, err) @@ -335,7 +353,14 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { txMgr := transactions.NewManager() - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator( + queue, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) t.Run("forwards accessor operations", func(t *testing.T) { var primaryChosen int @@ -526,7 +551,14 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) txMgr := transactions.NewManager() - coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator( + queueInterceptor, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, diff --git a/internal/praefect/datastore/generation_local.go b/internal/praefect/datastore/generation_local.go new file mode 100644 index 000000000..c9985d14a --- /dev/null +++ b/internal/praefect/datastore/generation_local.go @@ -0,0 +1,163 @@ +package datastore + +import ( + "context" + "sync" +) + +// LocalGenerationStore is an in-memory implementation of GenerationStore. +// Refer to the interface for method documentation. +type LocalGenerationStore struct { + m sync.Mutex + + storages map[string][]string + generations map[string]map[string]map[string]int + repositories map[string]map[string]int +} + +// NewLocalGenerationStore returns an in-memory implementation of GenerationStore. +func NewLocalGenerationStore(storages map[string][]string) *LocalGenerationStore { + return &LocalGenerationStore{ + storages: storages, + generations: make(map[string]map[string]map[string]int), + repositories: make(map[string]map[string]int), + } +} + +func (l *LocalGenerationStore) IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error) { + l.m.Lock() + defer l.m.Unlock() + + nextGen := l.latestGeneration(virtualStorage, relativePath) + 1 + l.setGeneration(virtualStorage, relativePath, storage, nextGen) + + return nextGen, nil +} + +func (l *LocalGenerationStore) SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error { + l.m.Lock() + defer l.m.Unlock() + + l.setGeneration(virtualStorage, relativePath, storage, generation) + + return nil +} + +func (l *LocalGenerationStore) DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error { + l.m.Lock() + defer l.m.Unlock() + + vs, ok := l.generations[virtualStorage] + if !ok { + return nil + } + + rel, ok := vs[relativePath] + if !ok { + return nil + } + + delete(rel, storage) + + return nil +} + +func (l *LocalGenerationStore) EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error { + l.m.Lock() + defer l.m.Unlock() + + if current := l.getGeneration(virtualStorage, relativePath, storage); current != GenerationUnknown && current >= generation { + return errDowngradeAttempted + } + + return nil +} + +func (l *LocalGenerationStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { + storages, ok := l.storages[virtualStorage] + if !ok { + return nil, errUnknownVirtualStorage + } + + outdatedRepos := make(map[string]map[string]int) + repositories, ok := l.repositories[virtualStorage] + if !ok { + return outdatedRepos, nil + } + + for relativePath, latestGeneration := range repositories { + for _, storage := range storages { + if gen := l.getGeneration(virtualStorage, relativePath, storage); gen < latestGeneration { + if outdatedRepos[relativePath] == nil { + outdatedRepos[relativePath] = make(map[string]int) + } + + outdatedRepos[relativePath][storage] = latestGeneration - gen + } + } + } + + return outdatedRepos, nil +} + +func (l *LocalGenerationStore) latestGeneration(virtualStorage, relativePath string) int { + vs := l.repositories[virtualStorage] + if vs == nil { + return GenerationUnknown + } + + if latest, ok := vs[relativePath]; ok { + return latest + } + + return GenerationUnknown +} + +func (l *LocalGenerationStore) getGeneration(virtualStorage, relativePath, storage string) int { + vs := l.generations[virtualStorage] + if vs == nil { + return GenerationUnknown + } + + rel := vs[relativePath] + if rel == nil { + return GenerationUnknown + } + + if gen, ok := rel[storage]; ok { + return gen + } + + return GenerationUnknown +} + +func (l *LocalGenerationStore) setGeneration(virtualStorage, relativePath, storage string, generation int) { + if generation > l.latestGeneration(virtualStorage, relativePath) { + if l.repositories[virtualStorage] == nil { + l.repositories[virtualStorage] = make(map[string]int) + } + l.repositories[virtualStorage][relativePath] = generation + } + + vs := l.generations[virtualStorage] + if vs == nil { + l.generations[virtualStorage] = map[string]map[string]int{ + relativePath: { + storage: generation, + }, + } + + return + } + + rel := vs[relativePath] + if rel == nil { + vs[relativePath] = map[string]int{ + storage: generation, + } + + return + } + + rel[storage] = generation +} diff --git a/internal/praefect/datastore/generation_local_test.go b/internal/praefect/datastore/generation_local_test.go new file mode 100644 index 000000000..1e5043efb --- /dev/null +++ b/internal/praefect/datastore/generation_local_test.go @@ -0,0 +1,174 @@ +package datastore + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestGenerationStore_Local(t *testing.T) { + testGenerationStore(t, func(t *testing.T, storages map[string][]string) GenerationStore { + return NewLocalGenerationStore(storages) + }) +} + +func testGenerationStore(t *testing.T, newStore func(t *testing.T, storages map[string][]string) GenerationStore) { + ctx, cancel := testhelper.Context() + defer cancel() + + t.Run("IncrementGeneration", func(t *testing.T) { + gs := newStore(t, nil) + + ctx, cancel := testhelper.Context() + defer cancel() + + t.Run("creates a new record", func(t *testing.T) { + generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + require.Equal(t, 0, generation) + }) + + t.Run("increments existing record", func(t *testing.T) { + generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + require.Equal(t, 1, generation) + }) + }) + + t.Run("SetGeneration", func(t *testing.T) { + gs := newStore(t, nil) + + t.Run("creates a record", func(t *testing.T) { + err := gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1) + require.NoError(t, err) + }) + + t.Run("updates existing record", func(t *testing.T) { + err := gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 0) + require.NoError(t, err) + }) + + t.Run("increments stays monotonic", func(t *testing.T) { + generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + require.Equal(t, 2, generation) + }) + }) + + t.Run("EnsureUpgrade", func(t *testing.T) { + t.Run("no previous record allowed", func(t *testing.T) { + gs := newStore(t, nil) + require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown)) + require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 0)) + }) + + t.Run("upgrade allowed", func(t *testing.T) { + gs := newStore(t, nil) + + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 0)) + require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 1)) + require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown)) + }) + + t.Run("downgrade prevented", func(t *testing.T) { + gs := newStore(t, nil) + + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1)) + require.Equal(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 0)) + require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown)) + }) + + t.Run("same version prevented", func(t *testing.T) { + gs := newStore(t, nil) + + require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1)) + require.Equal(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 1)) + require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown)) + }) + }) + + t.Run("DeleteRecord", func(t *testing.T) { + gs := newStore(t, nil) + + t.Run("delete non-existing", func(t *testing.T) { + err := gs.DeleteRecord(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + }) + + t.Run("delete existing", func(t *testing.T) { + generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + require.Equal(t, 0, generation) + + err = gs.DeleteRecord(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + + generation, err = gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1") + require.NoError(t, err) + require.Equal(t, 1, generation) + }) + }) + + t.Run("GetOutdatedRepositories", func(t *testing.T) { + type state map[string]map[string]map[string]struct { + generation int + } + + type expected map[string]map[string]int + + for _, tc := range []struct { + desc string + state state + expected map[string]map[string]int + }{ + { + "no records in virtual storage", + state{"virtual-storage-2": {"storage-1": {"repo-1": {generation: 0}}}}, + expected{}, + }, + { + "storages missing records", + state{"virtual-storage-1": {"storage-1": {"repo-1": {generation: 0}}}}, + expected{"repo-1": {"storage-2": 1, "storage-3": 1}}, + }, + { + "outdated storages", + state{"virtual-storage-1": { + "storage-1": {"repo-1": {generation: 2}}, + "storage-2": {"repo-1": {generation: 1}}, + "storage-3": {"repo-1": {generation: 0}}, + }}, + expected{"repo-1": {"storage-2": 1, "storage-3": 2}}, + }, + { + "all up to date", + state{"virtual-storage-1": { + "storage-1": {"repo-1": {generation: 3}}, + "storage-2": {"repo-1": {generation: 3}}, + "storage-3": {"repo-1": {generation: 3}}, + }}, + expected{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gs := newStore(t, map[string][]string{"virtual-storage-1": {"storage-1", "storage-2", "storage-3"}}) + + ctx, cancel := testhelper.Context() + defer cancel() + + for vs, storages := range tc.state { + for storage, repos := range storages { + for repo, state := range repos { + require.NoError(t, gs.SetGeneration(ctx, vs, storage, repo, state.generation)) + } + } + } + + outdated, err := gs.GetOutdatedRepositories(ctx, "virtual-storage-1") + require.NoError(t, err) + require.Equal(t, tc.expected, outdated) + }) + } + }) +} diff --git a/internal/praefect/datastore/generation_postgres.go b/internal/praefect/datastore/generation_postgres.go new file mode 100644 index 000000000..2f8d28839 --- /dev/null +++ b/internal/praefect/datastore/generation_postgres.go @@ -0,0 +1,196 @@ +package datastore + +import ( + "context" + "database/sql" + "errors" + + "github.com/lib/pq" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" +) + +// GenerationUnknown is used to indicate lack of generation number in +// a replication job. Older instances can produce replication jobs +// without a generation number. +const GenerationUnknown = -1 + +var ( + errDowngradeAttempted = errors.New("downgrade attempted") + errUnknownVirtualStorage = errors.New("unknown virtual storage") +) + +// GenerationStore provides access to repositoy generation metadata. +type GenerationStore interface { + // IncrementGeneration increments the repository's generation and sets the storage's generation + // to match the value. + IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error) + // SetGeneration sets the repository's generation on the given storage. + SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error + // EnsureUpgrade returns an error if the given generation would downgrade the repository on the storage. + EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error + // DeleteRecord deletes a storage's entry for the repository. + DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error + // GetOutdatedRepositories gets all storage's which have outdated repositories and lists how many + // generations they are behind. + GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) +} + +// PostgresGenerationStore is an in-memory implementation of GenerationStore. +// Refer to the interface for method documentation. +type PostgresGenerationStore struct { + db glsql.Querier + storages map[string][]string +} + +// NewLocalGenerationStore returns a Postgres implementation of GenerationStore. +func NewPostgresGenerationStore(db glsql.Querier, storages map[string][]string) *PostgresGenerationStore { + return &PostgresGenerationStore{db: db, storages: storages} +} + +func (rs *PostgresGenerationStore) IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error) { + const q = ` +WITH next AS ( + INSERT INTO repository_generations ( + virtual_storage, + relative_path, + generation + ) VALUES ($1, $2, 0) + ON CONFLICT (virtual_storage, relative_path) DO + UPDATE SET generation = repository_generations.generation + 1 + RETURNING virtual_storage, relative_path, generation +) + +INSERT INTO storage_generations ( + virtual_storage, + relative_path, + storage, + generation +) +SELECT virtual_storage, relative_path, $3, generation +FROM next +ON CONFLICT (virtual_storage, relative_path, storage) DO + UPDATE SET generation = EXCLUDED.generation +RETURNING generation +` + + var generation int + if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&generation); err != nil { + return 0, err + } + + return generation, nil +} + +func (rs *PostgresGenerationStore) SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error { + const q = ` +WITH repository AS ( + INSERT INTO repository_generations ( + virtual_storage, + relative_path, + generation + ) VALUES ($1, $2, $4) + ON CONFLICT (virtual_storage, relative_path) DO + UPDATE SET generation = EXCLUDED.generation + WHERE repository_generations.generation < EXCLUDED.generation +) + +INSERT INTO storage_generations ( + virtual_storage, + relative_path, + storage, + generation +) +VALUES ($1, $2, $3, $4) +ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET + generation = EXCLUDED.generation +` + + _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation) + return err +} + +func (rs *PostgresGenerationStore) EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error { + const q = ` +SELECT generation +FROM storage_generations +WHERE virtual_storage = $1 +AND relative_path = $2 +AND storage = $3 +` + var current int + if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(¤t); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + + return err + } + + if current >= generation { + return errDowngradeAttempted + } + + return nil +} + +func (rs *PostgresGenerationStore) DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error { + const q = ` +DELETE FROM storage_generations +WHERE virtual_storage = $1 +AND storage = $2 +AND relative_path = $3 +` + + _, err := rs.db.ExecContext(ctx, q, virtualStorage, storage, relativePath) + return err +} + +func (rs *PostgresGenerationStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) { + // As some storages might be missing records from the table, we do a cross join between the repositories + // configured storages. If a storage is missing an entry, it is considered fully outdated. + const q = ` +WITH repositories AS ( + SELECT virtual_storage, relative_path, generation + FROM repository_generations + WHERE virtual_storage = $1 +) + +SELECT storages.name, repository.relative_path, repository.generation - COALESCE(storage.generation, -1) AS behind_by +FROM (SELECT unnest($2::text[]) AS name) AS storages +CROSS JOIN repositories AS repository +LEFT JOIN storage_generations AS storage + ON storage.virtual_storage = repository.virtual_storage + AND storage.relative_path = repository.relative_path + AND storage.storage = storages.name +WHERE COALESCE(storage.generation, -1) < repository.generation +` + storages, ok := rs.storages[virtualStorage] + if !ok { + return nil, errUnknownVirtualStorage + } + + rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages)) + if err != nil { + return nil, err + } + defer rows.Close() + + outdated := make(map[string]map[string]int) + for rows.Next() { + var storage, relativePath string + var difference int + if err := rows.Scan(&storage, &relativePath, &difference); err != nil { + return nil, err + } + + storages := outdated[relativePath] + if storages == nil { + storages = make(map[string]int) + } + + storages[storage] = difference + outdated[relativePath] = storages + } + + return outdated, rows.Err() +} diff --git a/internal/praefect/datastore/generation_postgres_test.go b/internal/praefect/datastore/generation_postgres_test.go new file mode 100644 index 000000000..9dcba5e27 --- /dev/null +++ b/internal/praefect/datastore/generation_postgres_test.go @@ -0,0 +1,13 @@ +// +build postgres + +package datastore + +import ( + "testing" +) + +func TestGenerationStore_Postgres(t *testing.T) { + testGenerationStore(t, func(t *testing.T, storages map[string][]string) GenerationStore { + return NewPostgresGenerationStore(getDB(t), storages) + }) +} diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 9de73d7b0..968e6ff56 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -58,6 +58,8 @@ func (db DB) TruncateAll(t testing.TB) { "replication_queue_lock", "node_status", "shard_primaries", + "repository_generations", + "storage_generations", ) } diff --git a/internal/praefect/datastore/migrations/20200707101830_repositories_table.go b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go new file mode 100644 index 000000000..bbe044fd6 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go @@ -0,0 +1,28 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200707101830_repositories_table", + Up: []string{` +CREATE TABLE repository_generations ( + virtual_storage TEXT, + relative_path TEXT, + generation BIGINT NOT NULL, + PRIMARY KEY (virtual_storage, relative_path) +); + +CREATE TABLE storage_generations ( + virtual_storage TEXT, + relative_path TEXT, + storage TEXT, + generation BIGINT NOT NULL, + PRIMARY KEY (virtual_storage, relative_path, storage), + FOREIGN KEY (virtual_storage, relative_path) REFERENCES repository_generations +);`}, + Down: []string{"DROP TABLE repository_generations; DROP TABLE storage_generations;"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index d4a41499b..91852492e 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -235,8 +235,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue) } + gs := datastore.NewLocalGenerationStore(conf.StorageNames()) + coordinator := NewCoordinator( opt.withQueue, + gs, opt.withNodeMgr, opt.withTxMgr, conf, @@ -248,10 +251,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withLogger, conf.VirtualStorageNames(), opt.withQueue, + gs, opt.withNodeMgr, ) - prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue) + prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, gs) listener, port := listenAvailPort(t) t.Logf("proxy listening on port %d", port) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index cad593bb6..0dadf5738 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -38,6 +38,7 @@ type Replicator interface { type defaultReplicator struct { log *logrus.Entry + gs datastore.GenerationStore } func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error { @@ -51,6 +52,17 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli RelativePath: event.Job.RelativePath, } + // Old instances might produce replication jobs which do not have a generation number. Such replication jobs + // are processed if the target repository does not have a known generation yet. + generation := datastore.GenerationUnknown + if evtGen, ok := event.Job.Params["generation"]; ok { + generation = int(evtGen.(float64)) + } + + if err := dr.gs.EnsureUpgrade(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, generation); err != nil { + return fmt.Errorf("ensure upgrade: %w", err) + } + targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ @@ -101,6 +113,15 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli }).Error("checksums do not match") } + if generation >= 0 { + return dr.gs.SetGeneration(ctx, + event.Job.VirtualStorage, + event.Job.TargetNodeStorage, + event.Job.RelativePath, + generation, + ) + } + return nil } @@ -112,6 +133,13 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC) + // This might succeed and the repository removal fail. The repository removal will be reattempted as long + // as the replication job is still alive. We'll need an additional field later to separate this case from + // a missing repository record and to reattempt the removal even after the original replication job is dead. + if err := dr.gs.DeleteRecord(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath); err != nil { + return err + } + _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: targetRepo, }) @@ -294,12 +322,12 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, gs datastore.GenerationStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log.WithField("component", "replication_manager"), queue: queue, whitelist: map[string]struct{}{}, - replicator: defaultReplicator{log}, + replicator: defaultReplicator{log, gs}, virtualStorages: virtualStorages, nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}), diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 93954bd3a..8668de81d 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -145,8 +145,10 @@ func TestProcessReplicationJob(t *testing.T) { Message: "a commit", }) - var replicator defaultReplicator - replicator.log = entry + replicator := defaultReplicator{ + log: entry, + gs: datastore.NewLocalGenerationStore(conf.StorageNames()), + } mockReplicationGauge := promtest.NewMockStorageGauge() @@ -157,6 +159,7 @@ func TestProcessReplicationJob(t *testing.T) { testhelper.DiscardTestEntry(t), conf.VirtualStorageNames(), queue, + datastore.NewLocalGenerationStore(conf.StorageNames()), nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), @@ -217,11 +220,13 @@ func TestPropagateReplicationJob(t *testing.T) { txMgr := transactions.NewManager() - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + gs := datastore.NewLocalGenerationStore(conf.StorageNames()) - replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr) + coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) - prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue) + replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, gs, nodeMgr) + + prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs) listener, port := listenAvailPort(t) ctx, cancel := testhelper.Context() @@ -499,7 +504,13 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) + replMgr := NewReplMgr( + logEntry, + conf.VirtualStorageNames(), + queueInterceptor, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + ) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -639,7 +650,13 @@ func TestProcessBacklog_Success(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) + replMgr := NewReplMgr( + logEntry, + conf.VirtualStorageNames(), + queueInterceptor, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + ) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -703,6 +720,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { testhelper.DiscardTestEntry(t), conf.VirtualStorageNames(), queue, + datastore.NewLocalGenerationStore(conf.StorageNames()), &nodes.MockManager{ GetShardFunc: func(vs string) (nodes.Shard, error) { require.Equal(t, virtualStorage, vs) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 08f5aac4f..84a9ebfb7 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -42,6 +42,7 @@ func NewGRPCServer( nodeMgr nodes.Manager, txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, + gs datastore.GenerationStore, grpcOpts ...grpc.ServerOption, ) *grpc.Server { ctxTagOpts := []grpc_ctxtags.Option{ @@ -85,7 +86,7 @@ func NewGRPCServer( warnDupeAddrs(logger, conf) srv := grpc.NewServer(grpcOpts...) - registerServices(srv, nodeMgr, txMgr, conf, queue) + registerServices(srv, nodeMgr, txMgr, conf, queue, gs) return srv } @@ -97,10 +98,10 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { } // registerServices registers services praefect needs to handle RPCs on its own. -func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) { +func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, gs datastore.GenerationStore) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, gs)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go index bf2172808..d9d800f70 100644 --- a/internal/praefect/server_factory.go +++ b/internal/praefect/server_factory.go @@ -25,6 +25,7 @@ func NewServerFactory( nodeMgr nodes.Manager, txMgr *transactions.Manager, queue datastore.ReplicationEventQueue, + gs datastore.GenerationStore, registry *protoregistry.Registry, ) *ServerFactory { return &ServerFactory{ @@ -34,6 +35,7 @@ func NewServerFactory( nodeMgr: nodeMgr, txMgr: txMgr, queue: queue, + gs: gs, registry: registry, } } @@ -47,6 +49,7 @@ type ServerFactory struct { nodeMgr nodes.Manager txMgr *transactions.Manager queue datastore.ReplicationEventQueue + gs datastore.GenerationStore registry *protoregistry.Registry secure, insecure []*grpc.Server } @@ -112,6 +115,7 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server { s.nodeMgr, s.txMgr, s.queue, + s.gs, grpcOpts..., ) } diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 29f1cf753..4e02120e6 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -82,7 +82,8 @@ func TestServerFactory(t *testing.T) { require.NoError(t, err) txMgr := transactions.NewManager() registry := protoregistry.GitalyProtoPreregistered - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) + gs := datastore.NewLocalGenerationStore(conf.StorageNames()) + coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, registry) checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient { t.Helper() @@ -104,7 +105,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, ":0") @@ -133,7 +134,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, ":0") @@ -172,7 +173,7 @@ func TestServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry) defer praefectServerFactory.Stop() // start with tcp address @@ -240,7 +241,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -249,7 +250,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index f1f4b717b..93b1f4861 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -781,7 +781,14 @@ func TestProxyWrites(t *testing.T) { require.NoError(t, err) txMgr := transactions.NewManager() - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator( + queue, + datastore.NewLocalGenerationStore(conf.StorageNames()), + nodeMgr, + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) server := grpc.NewServer( grpc.CustomCodec(proxy.NewCodec()), diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go index 9c6a898ba..988e1d1cb 100644 --- a/internal/praefect/service/info/dataloss.go +++ b/internal/praefect/service/info/dataloss.go @@ -8,37 +8,38 @@ import ( ) func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) { - shard, err := s.nodeMgr.GetShard(req.VirtualStorage) + shard, err := s.nodeMgr.GetShard(req.GetVirtualStorage()) if err != nil { return nil, err } - if shard.PreviousWritablePrimary == nil { - return &gitalypb.DatalossCheckResponse{ - CurrentPrimary: shard.Primary.GetStorage(), - IsReadOnly: shard.IsReadOnly, - }, nil - } - - repos, err := s.queue.GetOutdatedRepositories(ctx, req.GetVirtualStorage(), shard.PreviousWritablePrimary.GetStorage()) + outdatedRepos, err := s.gs.GetOutdatedRepositories(ctx, req.GetVirtualStorage()) if err != nil { return nil, err } - outdatedNodes := make([]*gitalypb.DatalossCheckResponse_Nodes, 0, len(repos)) - for repo, nodes := range repos { - outdatedNodes = append(outdatedNodes, &gitalypb.DatalossCheckResponse_Nodes{ - RelativePath: repo, - Nodes: nodes, + pbRepos := make([]*gitalypb.DatalossCheckResponse_Repository, 0, len(outdatedRepos)) + for relativePath, storages := range outdatedRepos { + pbStorages := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(storages)) + for name, behindBy := range storages { + pbStorages = append(pbStorages, &gitalypb.DatalossCheckResponse_Repository_Storage{ + Name: name, + BehindBy: int64(behindBy), + }) + } + + sort.Slice(pbStorages, func(i, j int) bool { return pbStorages[i].Name < pbStorages[j].Name }) + + pbRepos = append(pbRepos, &gitalypb.DatalossCheckResponse_Repository{ + RelativePath: relativePath, + Storages: pbStorages, }) } - sort.Slice(outdatedNodes, func(i, j int) bool { return outdatedNodes[i].RelativePath < outdatedNodes[j].RelativePath }) + sort.Slice(pbRepos, func(i, j int) bool { return pbRepos[i].RelativePath < pbRepos[j].RelativePath }) return &gitalypb.DatalossCheckResponse{ - PreviousWritablePrimary: shard.PreviousWritablePrimary.GetStorage(), - CurrentPrimary: shard.Primary.GetStorage(), - IsReadOnly: shard.IsReadOnly, - OutdatedNodes: outdatedNodes, + Primary: shard.Primary.GetStorage(), + Repositories: pbRepos, }, nil } diff --git a/internal/praefect/service/info/dataloss_test.go b/internal/praefect/service/info/dataloss_test.go deleted file mode 100644 index 34a51dc45..000000000 --- a/internal/praefect/service/info/dataloss_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package info - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" -) - -func TestDatalossCheck(t *testing.T) { - for _, tc := range []struct { - desc string - shard nodes.Shard - outdated map[string][]string - response *gitalypb.DatalossCheckResponse - error error - }{ - { - desc: "no previous writable primary", - shard: nodes.Shard{ - Primary: &nodes.MockNode{StorageName: "primary-storage"}, - }, - response: &gitalypb.DatalossCheckResponse{ - CurrentPrimary: "primary-storage", - }, - }, - { - desc: "multiple out of date", - shard: nodes.Shard{ - PreviousWritablePrimary: &nodes.MockNode{StorageName: "previous-primary"}, - Primary: &nodes.MockNode{StorageName: "primary-storage"}, - IsReadOnly: true, - }, - outdated: map[string][]string{ - "repo-2": {"node-3"}, - "repo-1": {"node-1", "node-2"}, - }, - response: &gitalypb.DatalossCheckResponse{ - PreviousWritablePrimary: "previous-primary", - CurrentPrimary: "primary-storage", - IsReadOnly: true, - OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{ - {RelativePath: "repo-1", Nodes: []string{"node-1", "node-2"}}, - {RelativePath: "repo-2", Nodes: []string{"node-3"}}, - }, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - const virtualStorage = "test-virtual-storage" - mgr := &nodes.MockManager{ - GetShardFunc: func(vs string) (nodes.Shard, error) { - require.Equal(t, virtualStorage, vs) - return tc.shard, nil - }, - } - - rq := &datastore.MockReplicationEventQueue{ - GetOutdatedRepositoriesFunc: func(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) { - return tc.outdated, nil - }, - } - - srv := NewServer(mgr, config.Config{}, rq) - resp, err := srv.DatalossCheck(context.Background(), &gitalypb.DatalossCheckRequest{ - VirtualStorage: virtualStorage, - }) - require.Equal(t, tc.error, err) - require.Equal(t, tc.response, resp) - }) - } -} diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index f46c2fd03..55c7b7bc4 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -11,23 +11,27 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +type GenerationStore interface { + GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) +} + // Server is a InfoService server type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer nodeMgr nodes.Manager conf config.Config + gs GenerationStore queue datastore.ReplicationEventQueue } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue) gitalypb.PraefectInfoServiceServer { - s := &Server{ +func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, gs GenerationStore) gitalypb.PraefectInfoServiceServer { + return &Server{ nodeMgr: nodeMgr, conf: conf, + gs: gs, queue: queue, } - - return s } func (s *Server) EnableWrites(ctx context.Context, req *gitalypb.EnableWritesRequest) (*gitalypb.EnableWritesResponse, error) { diff --git a/internal/service/remote/fetch_internal_remote.go b/internal/service/remote/fetch_internal_remote.go index e57ff6d66..229680add 100644 --- a/internal/service/remote/fetch_internal_remote.go +++ b/internal/service/remote/fetch_internal_remote.go @@ -1,7 +1,6 @@ package remote import ( - "bytes" "context" "fmt" @@ -9,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/gitalyssh" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/service/ref" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -53,7 +51,7 @@ func (s *server) FetchInternalRemote(ctx context.Context, req *gitalypb.FetchInt return &gitalypb.FetchInternalRemoteResponse{Result: false}, nil } - remoteDefaultBranch, err := ref.DefaultBranchName(ctx, req.RemoteRepository) + /*remoteDefaultBranch, err := ref.DefaultBranchName(ctx, req.RemoteRepository) if err != nil { return nil, status.Errorf(codes.Internal, "FetchInternalRemote: remote default branch: %v", err) } @@ -67,7 +65,7 @@ func (s *server) FetchInternalRemote(ctx context.Context, req *gitalypb.FetchInt if err := ref.SetDefaultBranchRef(ctx, req.Repository, string(remoteDefaultBranch)); err != nil { return nil, status.Errorf(codes.Internal, "FetchInternalRemote: set default branch: %v", err) } - } + }*/ return &gitalypb.FetchInternalRemoteResponse{Result: true}, nil } diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go index 3a16715d1..45d4cd8a4 100644 --- a/proto/go/gitalypb/praefect.pb.go +++ b/proto/go/gitalypb/praefect.pb.go @@ -136,15 +136,13 @@ func (m *DatalossCheckRequest) GetVirtualStorage() string { } type DatalossCheckResponse struct { - VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"` - PreviousWritablePrimary string `protobuf:"bytes,2,opt,name=previous_writable_primary,json=previousWritablePrimary,proto3" json:"previous_writable_primary,omitempty"` - CurrentPrimary string `protobuf:"bytes,3,opt,name=current_primary,json=currentPrimary,proto3" json:"current_primary,omitempty"` - // whether the virtual storage is currently in read-only mode - IsReadOnly bool `protobuf:"varint,4,opt,name=is_read_only,json=isReadOnly,proto3" json:"is_read_only,omitempty"` - OutdatedNodes []*DatalossCheckResponse_Nodes `protobuf:"bytes,5,rep,name=outdated_nodes,json=outdatedNodes,proto3" json:"outdated_nodes,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // current primary storage + Primary string `protobuf:"bytes,1,opt,name=primary,proto3" json:"primary,omitempty"` + // repositories with data loss + Repositories []*DatalossCheckResponse_Repository `protobuf:"bytes,2,rep,name=repositories,proto3" json:"repositories,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *DatalossCheckResponse) Reset() { *m = DatalossCheckResponse{} } @@ -172,88 +170,118 @@ func (m *DatalossCheckResponse) XXX_DiscardUnknown() { var xxx_messageInfo_DatalossCheckResponse proto.InternalMessageInfo -func (m *DatalossCheckResponse) GetVirtualStorage() string { +func (m *DatalossCheckResponse) GetPrimary() string { if m != nil { - return m.VirtualStorage + return m.Primary } return "" } -func (m *DatalossCheckResponse) GetPreviousWritablePrimary() string { +func (m *DatalossCheckResponse) GetRepositories() []*DatalossCheckResponse_Repository { if m != nil { - return m.PreviousWritablePrimary + return m.Repositories } - return "" + return nil } -func (m *DatalossCheckResponse) GetCurrentPrimary() string { - if m != nil { - return m.CurrentPrimary - } - return "" +type DatalossCheckResponse_Repository struct { + // relative path of the repository with outdated replicas + RelativePath string `protobuf:"bytes,1,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` + // storages on which the repository is outdated + Storages []*DatalossCheckResponse_Repository_Storage `protobuf:"bytes,2,rep,name=storages,proto3" json:"storages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DatalossCheckResponse_Repository) Reset() { *m = DatalossCheckResponse_Repository{} } +func (m *DatalossCheckResponse_Repository) String() string { return proto.CompactTextString(m) } +func (*DatalossCheckResponse_Repository) ProtoMessage() {} +func (*DatalossCheckResponse_Repository) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{3, 0} } -func (m *DatalossCheckResponse) GetIsReadOnly() bool { +func (m *DatalossCheckResponse_Repository) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DatalossCheckResponse_Repository.Unmarshal(m, b) +} +func (m *DatalossCheckResponse_Repository) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DatalossCheckResponse_Repository.Marshal(b, m, deterministic) +} +func (m *DatalossCheckResponse_Repository) XXX_Merge(src proto.Message) { + xxx_messageInfo_DatalossCheckResponse_Repository.Merge(m, src) +} +func (m *DatalossCheckResponse_Repository) XXX_Size() int { + return xxx_messageInfo_DatalossCheckResponse_Repository.Size(m) +} +func (m *DatalossCheckResponse_Repository) XXX_DiscardUnknown() { + xxx_messageInfo_DatalossCheckResponse_Repository.DiscardUnknown(m) +} + +var xxx_messageInfo_DatalossCheckResponse_Repository proto.InternalMessageInfo + +func (m *DatalossCheckResponse_Repository) GetRelativePath() string { if m != nil { - return m.IsReadOnly + return m.RelativePath } - return false + return "" } -func (m *DatalossCheckResponse) GetOutdatedNodes() []*DatalossCheckResponse_Nodes { +func (m *DatalossCheckResponse_Repository) GetStorages() []*DatalossCheckResponse_Repository_Storage { if m != nil { - return m.OutdatedNodes + return m.Storages } return nil } -type DatalossCheckResponse_Nodes struct { - // relative path of the repository with outdated nodes - RelativePath string `protobuf:"bytes,1,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` - // nodes whose copy of the repository is not up to date - Nodes []string `protobuf:"bytes,2,rep,name=nodes,proto3" json:"nodes,omitempty"` +type DatalossCheckResponse_Repository_Storage struct { + // name of the storage + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // behind_by indicates how many generations this storage is behind. + BehindBy int64 `protobuf:"varint,2,opt,name=behind_by,json=behindBy,proto3" json:"behind_by,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *DatalossCheckResponse_Nodes) Reset() { *m = DatalossCheckResponse_Nodes{} } -func (m *DatalossCheckResponse_Nodes) String() string { return proto.CompactTextString(m) } -func (*DatalossCheckResponse_Nodes) ProtoMessage() {} -func (*DatalossCheckResponse_Nodes) Descriptor() ([]byte, []int) { - return fileDescriptor_d32bf44842ead735, []int{3, 0} +func (m *DatalossCheckResponse_Repository_Storage) Reset() { + *m = DatalossCheckResponse_Repository_Storage{} +} +func (m *DatalossCheckResponse_Repository_Storage) String() string { return proto.CompactTextString(m) } +func (*DatalossCheckResponse_Repository_Storage) ProtoMessage() {} +func (*DatalossCheckResponse_Repository_Storage) Descriptor() ([]byte, []int) { + return fileDescriptor_d32bf44842ead735, []int{3, 0, 0} } -func (m *DatalossCheckResponse_Nodes) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DatalossCheckResponse_Nodes.Unmarshal(m, b) +func (m *DatalossCheckResponse_Repository_Storage) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Unmarshal(m, b) } -func (m *DatalossCheckResponse_Nodes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DatalossCheckResponse_Nodes.Marshal(b, m, deterministic) +func (m *DatalossCheckResponse_Repository_Storage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Marshal(b, m, deterministic) } -func (m *DatalossCheckResponse_Nodes) XXX_Merge(src proto.Message) { - xxx_messageInfo_DatalossCheckResponse_Nodes.Merge(m, src) +func (m *DatalossCheckResponse_Repository_Storage) XXX_Merge(src proto.Message) { + xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Merge(m, src) } -func (m *DatalossCheckResponse_Nodes) XXX_Size() int { - return xxx_messageInfo_DatalossCheckResponse_Nodes.Size(m) +func (m *DatalossCheckResponse_Repository_Storage) XXX_Size() int { + return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Size(m) } -func (m *DatalossCheckResponse_Nodes) XXX_DiscardUnknown() { - xxx_messageInfo_DatalossCheckResponse_Nodes.DiscardUnknown(m) +func (m *DatalossCheckResponse_Repository_Storage) XXX_DiscardUnknown() { + xxx_messageInfo_DatalossCheckResponse_Repository_Storage.DiscardUnknown(m) } -var xxx_messageInfo_DatalossCheckResponse_Nodes proto.InternalMessageInfo +var xxx_messageInfo_DatalossCheckResponse_Repository_Storage proto.InternalMessageInfo -func (m *DatalossCheckResponse_Nodes) GetRelativePath() string { +func (m *DatalossCheckResponse_Repository_Storage) GetName() string { if m != nil { - return m.RelativePath + return m.Name } return "" } -func (m *DatalossCheckResponse_Nodes) GetNodes() []string { +func (m *DatalossCheckResponse_Repository_Storage) GetBehindBy() int64 { if m != nil { - return m.Nodes + return m.BehindBy } - return nil + return 0 } type RepositoryReplicasRequest struct { @@ -542,7 +570,8 @@ func init() { proto.RegisterType((*EnableWritesResponse)(nil), "gitaly.EnableWritesResponse") proto.RegisterType((*DatalossCheckRequest)(nil), "gitaly.DatalossCheckRequest") proto.RegisterType((*DatalossCheckResponse)(nil), "gitaly.DatalossCheckResponse") - proto.RegisterType((*DatalossCheckResponse_Nodes)(nil), "gitaly.DatalossCheckResponse.Nodes") + proto.RegisterType((*DatalossCheckResponse_Repository)(nil), "gitaly.DatalossCheckResponse.Repository") + proto.RegisterType((*DatalossCheckResponse_Repository_Storage)(nil), "gitaly.DatalossCheckResponse.Repository.Storage") proto.RegisterType((*RepositoryReplicasRequest)(nil), "gitaly.RepositoryReplicasRequest") proto.RegisterType((*RepositoryReplicasResponse)(nil), "gitaly.RepositoryReplicasResponse") proto.RegisterType((*RepositoryReplicasResponse_RepositoryDetails)(nil), "gitaly.RepositoryReplicasResponse.RepositoryDetails") @@ -553,54 +582,52 @@ func init() { func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) } var fileDescriptor_d32bf44842ead735 = []byte{ - // 741 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xeb, 0x44, - 0x14, 0x96, 0xf3, 0x73, 0x49, 0x4f, 0x93, 0xdc, 0xdc, 0xb9, 0xa5, 0x4d, 0x4d, 0xa1, 0xc1, 0x15, - 0x6a, 0x24, 0xda, 0xa4, 0x0a, 0x48, 0x48, 0x2c, 0xfb, 0xb3, 0x68, 0x17, 0x6d, 0xe4, 0x4a, 0x54, - 0x62, 0x63, 0x4d, 0xec, 0x93, 0x64, 0xc0, 0xf1, 0x98, 0x99, 0x49, 0x50, 0xde, 0x80, 0x05, 0x7b, - 0x1e, 0x80, 0x77, 0xe1, 0x0d, 0x10, 0x4f, 0xc1, 0x0b, 0xb0, 0x42, 0xf6, 0x8c, 0x4d, 0x92, 0x3a, - 0x45, 0xea, 0xdd, 0xf9, 0x9c, 0xef, 0x3b, 0xdf, 0x99, 0xf3, 0xe3, 0x19, 0x68, 0xc6, 0x82, 0xe2, - 0x18, 0x7d, 0xd5, 0x8b, 0x05, 0x57, 0x9c, 0xbc, 0x99, 0x30, 0x45, 0xc3, 0xa5, 0x0d, 0x21, 0x8b, - 0x8c, 0xcf, 0xae, 0xcb, 0x29, 0x15, 0x18, 0x18, 0xeb, 0x78, 0xc2, 0xf9, 0x24, 0xc4, 0x7e, 0x6a, - 0x8d, 0xe6, 0xe3, 0xbe, 0x62, 0x33, 0x94, 0x8a, 0xce, 0x62, 0x4d, 0x70, 0xae, 0xe1, 0xfd, 0x4d, - 0x44, 0x47, 0x21, 0x3e, 0x09, 0xa6, 0x50, 0xba, 0xf8, 0xd3, 0x1c, 0xa5, 0x22, 0xe7, 0xf0, 0x76, - 0xc1, 0x84, 0x9a, 0xd3, 0xd0, 0x93, 0x8a, 0x0b, 0x3a, 0xc1, 0xb6, 0xd5, 0xb1, 0xba, 0x3b, 0x97, - 0x95, 0x5f, 0xfe, 0x38, 0xb3, 0xdc, 0xa6, 0x01, 0x1f, 0x35, 0xe6, 0xec, 0xc3, 0xde, 0xba, 0x8a, - 0x8c, 0x79, 0x24, 0xd1, 0xb9, 0x81, 0xbd, 0x6b, 0xaa, 0x68, 0xc8, 0xa5, 0xbc, 0x9a, 0xa2, 0xff, - 0xe3, 0x2b, 0xe5, 0xff, 0x2a, 0xc1, 0xc7, 0x1b, 0x3a, 0x3a, 0x01, 0x39, 0xdd, 0x22, 0xb4, 0x29, - 0x41, 0xbe, 0x85, 0xc3, 0x58, 0xe0, 0x82, 0xf1, 0xb9, 0xf4, 0x7e, 0x16, 0x4c, 0x25, 0x87, 0xf5, - 0x62, 0xc1, 0x66, 0x54, 0x2c, 0xdb, 0xa5, 0x34, 0xe4, 0x20, 0x23, 0x3c, 0x19, 0x7c, 0xa8, 0xe1, - 0x24, 0x89, 0x3f, 0x17, 0x02, 0x23, 0x95, 0x47, 0x94, 0x75, 0x12, 0xe3, 0xce, 0x88, 0x1d, 0xa8, - 0x33, 0xe9, 0x09, 0xa4, 0x81, 0xc7, 0xa3, 0x70, 0xd9, 0xae, 0x74, 0xac, 0x6e, 0xcd, 0x05, 0x26, - 0x5d, 0xa4, 0xc1, 0x43, 0x14, 0x2e, 0xc9, 0x1d, 0x34, 0xf9, 0x5c, 0x05, 0x54, 0x61, 0xe0, 0x45, - 0x3c, 0x40, 0xd9, 0xae, 0x76, 0xca, 0xdd, 0xdd, 0xc1, 0x49, 0x4f, 0x8f, 0xb2, 0x57, 0x58, 0x66, - 0xef, 0x3e, 0xa1, 0xba, 0x8d, 0x2c, 0x34, 0x35, 0xed, 0x4b, 0xa8, 0xa6, 0x1f, 0xe4, 0x04, 0x1a, - 0x02, 0x43, 0xaa, 0xd8, 0x02, 0xbd, 0x98, 0xaa, 0xa9, 0x69, 0x41, 0x3d, 0x73, 0x0e, 0xa9, 0x9a, - 0x92, 0x3d, 0xa8, 0xea, 0x84, 0xa5, 0x4e, 0xb9, 0xbb, 0xe3, 0x6a, 0xc3, 0x79, 0x80, 0x43, 0x17, - 0x63, 0x2e, 0x99, 0xe2, 0x62, 0xe9, 0x62, 0x1c, 0x32, 0x9f, 0xe6, 0x4b, 0x30, 0x00, 0x10, 0x39, - 0x98, 0x8a, 0xee, 0x0e, 0x48, 0x76, 0xd0, 0x95, 0xb0, 0x15, 0x96, 0xf3, 0x7b, 0x09, 0xec, 0x22, - 0x45, 0x33, 0xaf, 0x7b, 0xf8, 0x28, 0x6b, 0xa1, 0xd6, 0xfb, 0xba, 0x40, 0x6f, 0x23, 0x68, 0x05, - 0xba, 0x46, 0x45, 0x59, 0x28, 0xdd, 0x4c, 0x84, 0x0c, 0xa1, 0x26, 0x0c, 0x3d, 0x2d, 0xec, 0xb5, - 0x82, 0xb9, 0x8a, 0xed, 0xc3, 0xbb, 0x67, 0xf0, 0x6b, 0x3a, 0x41, 0x6c, 0xa8, 0xf9, 0xc9, 0x10, - 0xe5, 0x7c, 0x66, 0x16, 0x2c, 0xb7, 0x9d, 0x3f, 0x2d, 0x38, 0xb8, 0xe2, 0x91, 0x64, 0x52, 0x61, - 0xe4, 0x2f, 0x3f, 0xe0, 0xdf, 0x20, 0x5f, 0x40, 0x53, 0x51, 0x31, 0x41, 0x95, 0xb3, 0x75, 0xb2, - 0x86, 0xf6, 0x66, 0xb4, 0x2f, 0xe1, 0x9d, 0xc0, 0x31, 0x0a, 0x8c, 0x7c, 0xcc, 0x99, 0x7a, 0x8b, - 0x5b, 0x39, 0x90, 0x91, 0xbf, 0x81, 0x83, 0x80, 0xc9, 0xf4, 0x17, 0x11, 0xe8, 0xf3, 0xc8, 0x67, - 0x61, 0xc8, 0xa8, 0x62, 0x3c, 0x32, 0x2b, 0xbd, 0x6f, 0x60, 0x77, 0x1d, 0x75, 0xfe, 0xb6, 0xa0, - 0xfd, 0xbc, 0x2e, 0x33, 0xfb, 0x33, 0x20, 0x49, 0x7b, 0xbc, 0xa2, 0x5d, 0x6d, 0x25, 0x88, 0xbb, - 0xba, 0xaf, 0xa7, 0xf0, 0xd6, 0xd4, 0xb5, 0xd1, 0x45, 0x53, 0xee, 0x95, 0xf1, 0x92, 0xf3, 0x44, - 0x36, 0xab, 0x2c, 0xe7, 0xea, 0xd2, 0xfe, 0xab, 0x39, 0xa7, 0x7f, 0x06, 0xbb, 0xc9, 0xac, 0xbd, - 0x1f, 0xf8, 0xc8, 0x63, 0x41, 0x5a, 0x4f, 0xc5, 0xdd, 0x49, 0x5c, 0x77, 0x7c, 0x74, 0x1b, 0x14, - 0x37, 0xaa, 0x5a, 0xdc, 0xa8, 0xc1, 0xaf, 0x65, 0x78, 0x3f, 0x34, 0x77, 0xf2, 0x6d, 0x34, 0xe6, - 0x8f, 0x28, 0x16, 0xcc, 0x47, 0x82, 0x40, 0x9e, 0xaf, 0x1f, 0xf9, 0xfc, 0xa5, 0xd5, 0x4c, 0x87, - 0x6f, 0x3b, 0xff, 0xbf, 0xbd, 0x4e, 0xed, 0x9f, 0xdf, 0xba, 0x95, 0x5a, 0xa9, 0x65, 0x11, 0x0a, - 0xad, 0xcd, 0x6e, 0x93, 0xe3, 0x4c, 0x61, 0xcb, 0x7e, 0xd9, 0x9d, 0xed, 0x84, 0x8d, 0x04, 0xa5, - 0x0b, 0x8b, 0x7c, 0x07, 0x8d, 0xb5, 0x2b, 0x89, 0x1c, 0x6d, 0xb9, 0xa9, 0xb4, 0xf8, 0xa7, 0x2f, - 0xde, 0x63, 0x2b, 0x47, 0x7f, 0x84, 0xfa, 0xea, 0x8b, 0x41, 0x3e, 0xc9, 0x02, 0x0b, 0x5e, 0x23, - 0xfb, 0xa8, 0x18, 0x5c, 0x13, 0xb5, 0x5a, 0xa5, 0xcb, 0x8b, 0xef, 0x13, 0x62, 0x48, 0x47, 0x3d, - 0x9f, 0xcf, 0xfa, 0xfa, 0xf3, 0x9c, 0x8b, 0x49, 0x5f, 0x87, 0xeb, 0x57, 0xb0, 0x3f, 0xe1, 0xc6, - 0x8e, 0x47, 0xa3, 0x37, 0xa9, 0xeb, 0xab, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x90, 0xeb, 0x26, - 0x4c, 0x5a, 0x07, 0x00, 0x00, + // 711 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xd3, 0x4c, + 0x14, 0x95, 0x93, 0x7c, 0x6d, 0x72, 0x93, 0xb6, 0xe9, 0xb4, 0x5f, 0x1b, 0xdc, 0x42, 0x83, 0x11, + 0x22, 0x12, 0x6d, 0x52, 0x05, 0x24, 0x24, 0x96, 0xfd, 0x59, 0x14, 0x55, 0x50, 0xb9, 0x12, 0x48, + 0x6c, 0xa2, 0xb1, 0x73, 0x93, 0x0c, 0x38, 0x1e, 0x33, 0x33, 0xa9, 0x94, 0x37, 0x60, 0xc1, 0x9e, + 0x07, 0xe0, 0x31, 0x10, 0x5b, 0xde, 0x80, 0xc7, 0xe0, 0x05, 0x58, 0x21, 0x67, 0xc6, 0x6e, 0x92, + 0xba, 0x05, 0x95, 0x9d, 0xe7, 0xde, 0x73, 0xcf, 0xf5, 0x39, 0x77, 0x7e, 0x60, 0x39, 0x12, 0x14, + 0x7b, 0xe8, 0xab, 0x66, 0x24, 0xb8, 0xe2, 0x64, 0xa1, 0xcf, 0x14, 0x0d, 0xc6, 0x36, 0x04, 0x2c, + 0x34, 0x31, 0xbb, 0x22, 0x07, 0x54, 0x60, 0xd7, 0xac, 0x76, 0xfa, 0x9c, 0xf7, 0x03, 0x6c, 0x4d, + 0x56, 0xde, 0xa8, 0xd7, 0x52, 0x6c, 0x88, 0x52, 0xd1, 0x61, 0xa4, 0x01, 0xce, 0x11, 0xac, 0x1d, + 0x87, 0xd4, 0x0b, 0xf0, 0x8d, 0x60, 0x0a, 0xa5, 0x8b, 0x1f, 0x46, 0x28, 0x15, 0xd9, 0x83, 0x95, + 0x0b, 0x26, 0xd4, 0x88, 0x06, 0x1d, 0xa9, 0xb8, 0xa0, 0x7d, 0xac, 0x59, 0x75, 0xab, 0x51, 0x3a, + 0x28, 0x7c, 0xfc, 0xbe, 0x6b, 0xb9, 0xcb, 0x26, 0x79, 0xae, 0x73, 0xce, 0x06, 0xac, 0xcf, 0xb2, + 0xc8, 0x88, 0x87, 0x12, 0x9d, 0x63, 0x58, 0x3f, 0xa2, 0x8a, 0x06, 0x5c, 0xca, 0xc3, 0x01, 0xfa, + 0xef, 0x6f, 0x49, 0xff, 0x2d, 0x07, 0xff, 0xcf, 0xf1, 0xe8, 0x06, 0xa4, 0x06, 0x8b, 0x91, 0x60, + 0x43, 0x2a, 0xc6, 0x9a, 0xc0, 0x4d, 0x96, 0xe4, 0x14, 0x2a, 0x02, 0x23, 0x2e, 0x99, 0xe2, 0x82, + 0xa1, 0xac, 0xe5, 0xea, 0xf9, 0x46, 0xb9, 0xdd, 0x68, 0x6a, 0xcb, 0x9a, 0x99, 0x74, 0x4d, 0x37, + 0xa9, 0x18, 0xbb, 0x33, 0xd5, 0xf6, 0x57, 0x0b, 0xe0, 0x32, 0x49, 0x1e, 0xc0, 0x92, 0xc0, 0x80, + 0x2a, 0x76, 0x81, 0x9d, 0x88, 0xaa, 0x81, 0x69, 0x5e, 0x49, 0x82, 0x67, 0x54, 0x0d, 0xc8, 0x29, + 0x14, 0x8d, 0xb8, 0xa4, 0xfb, 0xfe, 0xdf, 0x76, 0x6f, 0x1a, 0xe5, 0x6e, 0xca, 0x60, 0x3f, 0x87, + 0x45, 0x13, 0x24, 0x04, 0x0a, 0x21, 0x1d, 0x1a, 0xcb, 0xdc, 0xc9, 0x37, 0xd9, 0x82, 0x92, 0x87, + 0x03, 0x16, 0x76, 0x3b, 0xde, 0xb8, 0x96, 0xab, 0x5b, 0x8d, 0xbc, 0x5b, 0xd4, 0x81, 0x83, 0xb1, + 0xf3, 0x0a, 0xee, 0x4c, 0x29, 0xc3, 0x28, 0x60, 0x3e, 0x4d, 0x47, 0xdd, 0x06, 0x48, 0xa5, 0x6a, + 0x17, 0xcb, 0x6d, 0x92, 0xfc, 0xe8, 0x54, 0xd9, 0x14, 0xca, 0xf9, 0x92, 0x03, 0x3b, 0x8b, 0xd1, + 0x4c, 0xe5, 0xe5, 0xec, 0x54, 0xca, 0xed, 0xa7, 0x19, 0x7c, 0x73, 0x45, 0x53, 0xa9, 0x23, 0x54, + 0x94, 0x05, 0xf2, 0x72, 0x96, 0x67, 0x50, 0x14, 0x06, 0x6e, 0x9c, 0xbc, 0x1d, 0x61, 0xca, 0x62, + 0xfb, 0xb0, 0x7a, 0x25, 0x7d, 0x1b, 0x27, 0x88, 0x0d, 0x45, 0x3f, 0x1e, 0xa2, 0x1c, 0x0d, 0x27, + 0xb6, 0x97, 0xdc, 0x74, 0xed, 0xfc, 0xb0, 0x60, 0xf3, 0x90, 0x87, 0x92, 0x49, 0x85, 0xa1, 0x3f, + 0xfe, 0x87, 0x13, 0x40, 0x1e, 0xc2, 0xb2, 0xa2, 0xa2, 0x8f, 0x2a, 0x45, 0xeb, 0x66, 0x4b, 0x3a, + 0x9a, 0xc0, 0x1e, 0xc3, 0xaa, 0xc0, 0x1e, 0x0a, 0x0c, 0x7d, 0x4c, 0x91, 0xf9, 0x09, 0xb2, 0x9a, + 0x26, 0x12, 0xf0, 0x33, 0xd8, 0xec, 0x32, 0x19, 0x9f, 0xda, 0x8e, 0x40, 0x9f, 0x87, 0x3e, 0x0b, + 0x02, 0x46, 0x15, 0xe3, 0x61, 0xad, 0x50, 0xb7, 0x1a, 0x45, 0x77, 0xc3, 0xa4, 0xdd, 0xd9, 0xac, + 0xf3, 0xd3, 0x82, 0xda, 0x55, 0x5d, 0x66, 0xf6, 0xbb, 0x40, 0x62, 0x7b, 0x3a, 0x59, 0xe7, 0xa3, + 0x1a, 0x67, 0xdc, 0xe9, 0x33, 0xf2, 0x08, 0x56, 0x8c, 0xae, 0x39, 0x17, 0x8d, 0xdc, 0x43, 0x13, + 0x25, 0x7b, 0x31, 0x6d, 0xa2, 0x2c, 0xc5, 0x6a, 0x69, 0x97, 0x9a, 0x53, 0xf8, 0x3d, 0x28, 0xc7, + 0xb3, 0xee, 0xbc, 0xe3, 0x5e, 0x87, 0x75, 0x27, 0x7a, 0x0a, 0x6e, 0x29, 0x0e, 0xbd, 0xe0, 0xde, + 0x49, 0x37, 0xdb, 0xa8, 0xff, 0xb2, 0x8d, 0x6a, 0x7f, 0xca, 0xc3, 0xda, 0x99, 0xb9, 0x79, 0x4f, + 0xc2, 0x1e, 0x3f, 0x47, 0x71, 0xc1, 0x7c, 0x24, 0x08, 0xe4, 0xea, 0xf6, 0x23, 0xf7, 0x6f, 0xda, + 0x9a, 0x93, 0xe1, 0xdb, 0xce, 0x9f, 0x77, 0xaf, 0x53, 0xfc, 0xf5, 0xb9, 0x51, 0x28, 0xe6, 0xaa, + 0x16, 0xa1, 0x50, 0x9d, 0x77, 0x9b, 0xec, 0x24, 0x0c, 0xd7, 0xec, 0x2f, 0xbb, 0x7e, 0x3d, 0x60, + 0xae, 0x41, 0x6e, 0xdf, 0x22, 0xaf, 0x61, 0x69, 0xe6, 0x4a, 0x22, 0xdb, 0xd7, 0xdc, 0x54, 0x9a, + 0xfc, 0xee, 0x8d, 0xf7, 0xd8, 0xd4, 0xaf, 0x9f, 0x43, 0x65, 0xfa, 0x5d, 0x20, 0x5b, 0x49, 0x61, + 0xc6, 0x9b, 0x63, 0x6f, 0x67, 0x27, 0x67, 0x48, 0xad, 0x6a, 0xee, 0x60, 0xff, 0x6d, 0x0c, 0x0c, + 0xa8, 0xd7, 0xf4, 0xf9, 0xb0, 0xa5, 0x3f, 0xf7, 0xb8, 0xe8, 0xb7, 0x74, 0xb9, 0x7e, 0xeb, 0x5a, + 0x7d, 0x6e, 0xd6, 0x91, 0xe7, 0x2d, 0x4c, 0x42, 0x4f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0x11, + 0x72, 0xaa, 0x48, 0x40, 0x07, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/proto/praefect.proto b/proto/praefect.proto index 468642c53..ec5407fe0 100644 --- a/proto/praefect.proto +++ b/proto/praefect.proto @@ -57,19 +57,25 @@ message DatalossCheckRequest { } message DatalossCheckResponse { - message Nodes { - // relative path of the repository with outdated nodes + message Repository { + message Storage { + // name of the storage + string name = 1; + // behind_by indicates how many generations this storage is behind. + int64 behind_by = 2; + } + + // relative path of the repository with outdated replicas string relative_path = 1; - // nodes whose copy of the repository is not up to date - repeated string nodes = 2; + // storages on which the repository is outdated + repeated Storage storages = 2; } - string virtual_storage = 1; - string previous_writable_primary = 2; - string current_primary = 3; - // whether the virtual storage is currently in read-only mode - bool is_read_only = 4; - repeated Nodes outdated_nodes = 5; + // current primary storage + string primary = 1; + + // repositories with data loss + repeated Repository repositories = 2; } message RepositoryReplicasRequest{ diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb index 77412571c..559fb4677 100644 --- a/ruby/proto/gitaly/praefect_pb.rb +++ b/ruby/proto/gitaly/praefect_pb.rb @@ -16,15 +16,16 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :virtual_storage, :string, 1 end add_message "gitaly.DatalossCheckResponse" do - optional :virtual_storage, :string, 1 - optional :previous_writable_primary, :string, 2 - optional :current_primary, :string, 3 - optional :is_read_only, :bool, 4 - repeated :outdated_nodes, :message, 5, "gitaly.DatalossCheckResponse.Nodes" + optional :primary, :string, 1 + repeated :repositories, :message, 2, "gitaly.DatalossCheckResponse.Repository" end - add_message "gitaly.DatalossCheckResponse.Nodes" do + add_message "gitaly.DatalossCheckResponse.Repository" do optional :relative_path, :string, 1 - repeated :nodes, :string, 2 + repeated :storages, :message, 2, "gitaly.DatalossCheckResponse.Repository.Storage" + end + add_message "gitaly.DatalossCheckResponse.Repository.Storage" do + optional :name, :string, 1 + optional :behind_by, :int64, 2 end add_message "gitaly.RepositoryReplicasRequest" do optional :repository, :message, 1, "gitaly.Repository" @@ -57,7 +58,8 @@ module Gitaly EnableWritesResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.EnableWritesResponse").msgclass DatalossCheckRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckRequest").msgclass DatalossCheckResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse").msgclass - DatalossCheckResponse::Nodes = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Nodes").msgclass + DatalossCheckResponse::Repository = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Repository").msgclass + DatalossCheckResponse::Repository::Storage = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Repository.Storage").msgclass RepositoryReplicasRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasRequest").msgclass RepositoryReplicasResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse").msgclass RepositoryReplicasResponse::RepositoryDetails = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse.RepositoryDetails").msgclass |