Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-07 17:42:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-13 19:19:01 +0300
commitf6fe5f5ec213c8110ac18fc9a5d926f58dd04dd9 (patch)
treeec4ae874c9398439beabc1ea8c53702fdbb6c8e1
parenta863286e59c23b7ec374afca92fa25a454b40f29 (diff)
repository generationssmh-dataloss-generations
-rw-r--r--cmd/praefect/main.go16
-rw-r--r--cmd/praefect/subcmd_dataloss.go33
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go137
-rw-r--r--cmd/praefect/subcmd_test.go2
-rw-r--r--internal/praefect/auth_test.go6
-rw-r--r--internal/praefect/config/config.go15
-rw-r--r--internal/praefect/config/config_test.go12
-rw-r--r--internal/praefect/coordinator.go18
-rw-r--r--internal/praefect/coordinator_test.go40
-rw-r--r--internal/praefect/datastore/generation_local.go163
-rw-r--r--internal/praefect/datastore/generation_local_test.go174
-rw-r--r--internal/praefect/datastore/generation_postgres.go196
-rw-r--r--internal/praefect/datastore/generation_postgres_test.go13
-rw-r--r--internal/praefect/datastore/glsql/testing.go2
-rw-r--r--internal/praefect/datastore/migrations/20200707101830_repositories_table.go28
-rw-r--r--internal/praefect/helper_test.go6
-rw-r--r--internal/praefect/replicator.go32
-rw-r--r--internal/praefect/replicator_test.go32
-rw-r--r--internal/praefect/server.go7
-rw-r--r--internal/praefect/server_factory.go4
-rw-r--r--internal/praefect/server_factory_test.go13
-rw-r--r--internal/praefect/server_test.go9
-rw-r--r--internal/praefect/service/info/dataloss.go39
-rw-r--r--internal/praefect/service/info/dataloss_test.go76
-rw-r--r--internal/praefect/service/info/server.go12
-rw-r--r--internal/service/remote/fetch_internal_remote.go6
-rw-r--r--proto/go/gitalypb/praefect.pb.go225
-rw-r--r--proto/praefect.proto26
-rw-r--r--ruby/proto/gitaly/praefect_pb.rb18
29 files changed, 1009 insertions, 351 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index d52a4c308..9bd2909f3 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -230,10 +230,13 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
var queue datastore.ReplicationEventQueue
+ var gs datastore.GenerationStore
if conf.MemoryQueueEnabled {
queue = datastore.NewMemoryReplicationEventQueue(conf)
+ gs = datastore.NewLocalGenerationStore(conf.StorageNames())
} else {
queue = datastore.NewPostgresReplicationEventQueue(db)
+ gs = datastore.NewPostgresGenerationStore(db, conf.StorageNames())
}
nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
@@ -259,11 +262,19 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(queue, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
- repl = praefect.NewReplMgr(
+ coordinator = praefect.NewCoordinator(
+ queue,
+ gs,
+ nodeManager,
+ transactionManager,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+ repl = praefect.NewReplMgr(
logger,
conf.VirtualStorageNames(),
queue,
+ gs,
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
@@ -276,6 +287,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
nodeManager,
transactionManager,
queue,
+ gs,
protoregistry.GitalyProtoPreregistered,
)
)
diff --git a/cmd/praefect/subcmd_dataloss.go b/cmd/praefect/subcmd_dataloss.go
index f9c7f2d22..a8c57c465 100644
--- a/cmd/praefect/subcmd_dataloss.go
+++ b/cmd/praefect/subcmd_dataloss.go
@@ -74,27 +74,24 @@ func (cmd *datalossSubcommand) Exec(flags *flag.FlagSet, cfg config.Config) erro
return fmt.Errorf("error checking: %v", err)
}
- mode := "write-enabled"
- if resp.IsReadOnly {
- mode = "read-only"
- }
-
cmd.println(0, "Virtual storage: %s", vs)
- cmd.println(1, "Current %s primary: %s", mode, resp.CurrentPrimary)
- if resp.PreviousWritablePrimary == "" {
- fmt.Fprintln(cmd.output, " No data loss as the virtual storage has not encountered a failover")
- continue
- }
-
- cmd.println(1, "Previous write-enabled primary: %s", resp.PreviousWritablePrimary)
- if len(resp.OutdatedNodes) == 0 {
- cmd.println(2, "No data loss from failing over from %s", resp.PreviousWritablePrimary)
- continue
+ cmd.println(1, "Primary: %s", resp.Primary)
+ if len(resp.Repositories) == 0 {
+ cmd.println(1, "All repositories are consistent!")
+ return nil
}
- cmd.println(2, "Nodes with data loss from failing over from %s:", resp.PreviousWritablePrimary)
- for _, odn := range resp.OutdatedNodes {
- cmd.println(3, "%s: %s", odn.RelativePath, strings.Join(odn.Nodes, ", "))
+ cmd.println(1, "Outdated repositories:")
+ for _, r := range resp.Repositories {
+ cmd.println(2, "%s:", r.RelativePath)
+ for _, s := range r.Storages {
+ plural := ""
+ if s.BehindBy > 1 {
+ plural = "s"
+ }
+
+ cmd.println(3, "%s is behind by %d generation%s or less", s.Name, s.BehindBy, plural)
+ }
}
}
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 569cf7caa..46e201c43 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -5,9 +5,12 @@ import (
"context"
"testing"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/service/info"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -26,107 +29,81 @@ func (m mockPraefectInfoService) EnableWrites(ctx context.Context, r *gitalypb.E
}
func TestDatalossSubcommand(t *testing.T) {
- mockSvc := &mockPraefectInfoService{}
- ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)})
+ mgr := &nodes.MockManager{
+ GetShardFunc: func(vs string) (nodes.Shard, error) {
+ var primary string
+ switch vs {
+ case "virtual-storage-1":
+ primary = "gitaly-1"
+ case "virtual-storage-2":
+ primary = "gitaly-4"
+ default:
+ t.Error("unexpected virtual storage")
+ }
+
+ return nodes.Shard{Primary: &nodes.MockNode{StorageName: primary}}, nil
+ },
+ }
+
+ gs := datastore.NewLocalGenerationStore(map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2", "gitaly-3"},
+ "virtual-storage-2": {"gitaly-4"},
+ })
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-1", "repository-1", 1))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-2", "repository-1", 0))
+
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-2", "repository-2", 0))
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "gitaly-3", "repository-2", 0))
+
+ ln, clean := listenAndServe(t, []svcRegistrar{
+ registerPraefectInfoServer(info.NewServer(mgr, config.Config{}, nil, gs))})
defer clean()
for _, tc := range []struct {
desc string
args []string
virtualStorages []*config.VirtualStorage
- datalossCheck func(context.Context, *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error)
output string
error error
}{
{
desc: "positional arguments",
- args: []string{"-virtual-storage=test-virtual-storage", "positional-arg"},
+ args: []string{"-virtual-storage=virtual-storage-1", "positional-arg"},
error: UnexpectedPositionalArgsError{Command: "dataloss"},
},
{
- desc: "no failover",
- args: []string{"-virtual-storage=test-virtual-storage"},
- datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage())
- return &gitalypb.DatalossCheckResponse{
- CurrentPrimary: "test-current-primary",
- }, nil
- },
- output: `Virtual storage: test-virtual-storage
- Current write-enabled primary: test-current-primary
- No data loss as the virtual storage has not encountered a failover
-`,
- },
- {
- desc: "no data loss",
- args: []string{"-virtual-storage=test-virtual-storage"},
- datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage())
- return &gitalypb.DatalossCheckResponse{
- PreviousWritablePrimary: "test-previous-primary",
- IsReadOnly: false,
- CurrentPrimary: "test-current-primary",
- }, nil
- },
- output: `Virtual storage: test-virtual-storage
- Current write-enabled primary: test-current-primary
- Previous write-enabled primary: test-previous-primary
- No data loss from failing over from test-previous-primary
-`,
- },
- {
desc: "data loss",
- args: []string{"-virtual-storage=test-virtual-storage"},
- datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- assert.Equal(t, "test-virtual-storage", req.GetVirtualStorage())
- return &gitalypb.DatalossCheckResponse{
- PreviousWritablePrimary: "test-previous-primary",
- IsReadOnly: true,
- CurrentPrimary: "test-current-primary",
- OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{
- {RelativePath: "repository-1", Nodes: []string{"gitaly-2", "gitaly-3"}},
- {RelativePath: "repository-2", Nodes: []string{"gitaly-1"}},
- },
- }, nil
- },
- output: `Virtual storage: test-virtual-storage
- Current read-only primary: test-current-primary
- Previous write-enabled primary: test-previous-primary
- Nodes with data loss from failing over from test-previous-primary:
- repository-1: gitaly-2, gitaly-3
- repository-2: gitaly-1
+ args: []string{"-virtual-storage=virtual-storage-1"}, output: `Virtual storage: virtual-storage-1
+ Primary: gitaly-1
+ Outdated repositories:
+ repository-1:
+ gitaly-2 is behind by 1 generation or less
+ gitaly-3 is behind by 2 generations or less
+ repository-2:
+ gitaly-1 is behind by 1 generation or less
`,
},
{
desc: "multiple virtual storages",
- virtualStorages: []*config.VirtualStorage{{Name: "test-virtual-storage-2"}, {Name: "test-virtual-storage-1"}},
- datalossCheck: func(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- return &gitalypb.DatalossCheckResponse{
- PreviousWritablePrimary: "test-previous-primary",
- IsReadOnly: true,
- CurrentPrimary: "test-current-primary",
- OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{
- {RelativePath: "repository-1", Nodes: []string{"gitaly-2", "gitaly-3"}},
- {RelativePath: "repository-2", Nodes: []string{"gitaly-1"}},
- },
- }, nil
- },
- output: `Virtual storage: test-virtual-storage-1
- Current read-only primary: test-current-primary
- Previous write-enabled primary: test-previous-primary
- Nodes with data loss from failing over from test-previous-primary:
- repository-1: gitaly-2, gitaly-3
- repository-2: gitaly-1
-Virtual storage: test-virtual-storage-2
- Current read-only primary: test-current-primary
- Previous write-enabled primary: test-previous-primary
- Nodes with data loss from failing over from test-previous-primary:
- repository-1: gitaly-2, gitaly-3
- repository-2: gitaly-1
+ virtualStorages: []*config.VirtualStorage{{Name: "virtual-storage-2"}, {Name: "virtual-storage-1"}},
+ output: `Virtual storage: virtual-storage-1
+ Primary: gitaly-1
+ Outdated repositories:
+ repository-1:
+ gitaly-2 is behind by 1 generation or less
+ gitaly-3 is behind by 2 generations or less
+ repository-2:
+ gitaly-1 is behind by 1 generation or less
+Virtual storage: virtual-storage-2
+ Primary: gitaly-4
+ All repositories are consistent!
`,
},
} {
t.Run(tc.desc, func(t *testing.T) {
- mockSvc.DatalossCheckFunc = tc.datalossCheck
cmd := newDatalossSubcommand()
output := &bytes.Buffer{}
cmd.output = output
diff --git a/cmd/praefect/subcmd_test.go b/cmd/praefect/subcmd_test.go
index dc3e40758..38a0a199e 100644
--- a/cmd/praefect/subcmd_test.go
+++ b/cmd/praefect/subcmd_test.go
@@ -41,7 +41,7 @@ func listenAndServe(t testing.TB, svcs []svcRegistrar) (net.Listener, testhelper
tmp, clean := testhelper.TempDir(t)
- ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock"))
+ ln, err := net.Listen("unix", filepath.Join(tmp, "lollero"))
require.NoError(t, err)
srv := grpc.NewServer()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index b112a716f..730987869 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -178,9 +178,11 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
registry, err := protoregistry.New(fd)
require.NoError(t, err)
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
+ gs := datastore.NewLocalGenerationStore(conf.StorageNames())
- srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue)
+ coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, registry)
+
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 31b1b4d25..4cd10154f 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -172,6 +172,21 @@ func (c *Config) VirtualStorageNames() []string {
return names
}
+// StorageNames returns storage names by virtual storage.
+func (c *Config) StorageNames() map[string][]string {
+ storages := make(map[string][]string, len(c.VirtualStorages))
+ for _, vs := range c.VirtualStorages {
+ nodes := make([]string, len(vs.Nodes))
+ for i, n := range vs.Nodes {
+ nodes[i] = n.Storage
+ }
+
+ storages[vs.Name] = nodes
+ }
+
+ return storages
+}
+
// DB holds Postgres client configuration data.
type DB struct {
Host string `toml:"host"`
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 1ffde42ee..51c2d254c 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -336,6 +336,18 @@ func TestVirtualStorageNames(t *testing.T) {
require.Equal(t, []string{"praefect-1", "praefect-2"}, conf.VirtualStorageNames())
}
+func TestStorageNames(t *testing.T) {
+ conf := Config{
+ VirtualStorages: []*VirtualStorage{
+ {Name: "virtual-storage-1", Nodes: []*Node{{Storage: "gitaly-1"}, {Storage: "gitaly-2"}}},
+ {Name: "virtual-storage-2", Nodes: []*Node{{Storage: "gitaly-3"}, {Storage: "gitaly-4"}}},
+ }}
+ require.Equal(t, map[string][]string{
+ "virtual-storage-1": {"gitaly-1", "gitaly-2"},
+ "virtual-storage-2": {"gitaly-3", "gitaly-4"},
+ }, conf.StorageNames())
+}
+
func TestToPQString(t *testing.T) {
testCases := []struct {
desc string
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 47a48d4af..cf3b31a2b 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -60,9 +60,8 @@ func getReplicationDetails(methodName string, m proto.Message) (datastore.Change
return "", nil, fmt.Errorf("protocol changed: for method %q expected message type '%T', got '%T'", methodName, req, m)
}
return datastore.RepackIncremental, nil, nil
-
default:
- return datastore.UpdateRepo, nil, nil
+ return datastore.UpdateRepo, datastore.Params{}, nil
}
}
@@ -81,6 +80,7 @@ type Coordinator struct {
nodeMgr nodes.Manager
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
+ gs datastore.GenerationStore
registry *protoregistry.Registry
conf config.Config
}
@@ -88,6 +88,7 @@ type Coordinator struct {
// NewCoordinator returns a new Coordinator that utilizes the provided logger
func NewCoordinator(
queue datastore.ReplicationEventQueue,
+ gs datastore.GenerationStore,
nodeMgr nodes.Manager,
txMgr *transactions.Manager,
conf config.Config,
@@ -95,6 +96,7 @@ func NewCoordinator(
) *Coordinator {
return &Coordinator{
queue: queue,
+ gs: gs,
registry: r,
nodeMgr: nodeMgr,
txMgr: txMgr,
@@ -237,7 +239,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
})
}
} else {
- finalizers = append(finalizers, c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, shard.Secondaries, change, params))
+ finalizers = append(finalizers, c.newRequestFinalizer(ctx, virtualStorage, call.targetRepo, shard.Primary, shard.Secondaries, change, params))
}
reqFinalizer := func() error {
@@ -363,7 +365,7 @@ func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, err
return m, nil
}
-func (c *Coordinator) createReplicaJobs(
+func (c *Coordinator) newRequestFinalizer(
ctx context.Context,
virtualStorage string,
targetRepo *gitalypb.Repository,
@@ -373,6 +375,14 @@ func (c *Coordinator) createReplicaJobs(
params datastore.Params,
) func() error {
return func() error {
+ if change == datastore.UpdateRepo {
+ var err error
+ params["generation"], err = c.gs.IncrementGeneration(ctx, virtualStorage, primary.GetStorage(), targetRepo.GetRelativePath())
+ if err != nil {
+ return fmt.Errorf("failed incrementing primary's generation: %w", err)
+ }
+ }
+
correlationID := c.ensureCorrelationID(ctx, targetRepo)
g, ctx := errgroup.WithContext(ctx)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 7b13be6c6..4e841f716 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -87,6 +87,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
const storageName = "test-storage"
coordinator := NewCoordinator(
datastore.NewMemoryReplicationEventQueue(conf),
+ nil,
&nodes.MockManager{GetShardFunc: func(storage string) (nodes.Shard, error) {
return nodes.Shard{
IsReadOnly: tc.readOnly,
@@ -160,7 +161,14 @@ func TestStreamDirectorMutator(t *testing.T) {
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -212,6 +220,9 @@ func TestStreamDirectorMutator(t *testing.T) {
RelativePath: targetRepo.RelativePath,
TargetNodeStorage: secondaryNode.Storage,
SourceNodeStorage: primaryNode.Storage,
+ Params: datastore.Params{
+ "generation": 0,
+ },
},
Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
}
@@ -257,7 +268,14 @@ func TestStreamDirectorAccessor(t *testing.T) {
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queue,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
@@ -335,7 +353,14 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queue,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
t.Run("forwards accessor operations", func(t *testing.T) {
var primaryChosen int
@@ -526,7 +551,14 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
diff --git a/internal/praefect/datastore/generation_local.go b/internal/praefect/datastore/generation_local.go
new file mode 100644
index 000000000..c9985d14a
--- /dev/null
+++ b/internal/praefect/datastore/generation_local.go
@@ -0,0 +1,163 @@
+package datastore
+
+import (
+ "context"
+ "sync"
+)
+
+// LocalGenerationStore is an in-memory implementation of GenerationStore.
+// Refer to the interface for method documentation.
+type LocalGenerationStore struct {
+ m sync.Mutex
+
+ storages map[string][]string
+ generations map[string]map[string]map[string]int
+ repositories map[string]map[string]int
+}
+
+// NewLocalGenerationStore returns an in-memory implementation of GenerationStore.
+func NewLocalGenerationStore(storages map[string][]string) *LocalGenerationStore {
+ return &LocalGenerationStore{
+ storages: storages,
+ generations: make(map[string]map[string]map[string]int),
+ repositories: make(map[string]map[string]int),
+ }
+}
+
+func (l *LocalGenerationStore) IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error) {
+ l.m.Lock()
+ defer l.m.Unlock()
+
+ nextGen := l.latestGeneration(virtualStorage, relativePath) + 1
+ l.setGeneration(virtualStorage, relativePath, storage, nextGen)
+
+ return nextGen, nil
+}
+
+func (l *LocalGenerationStore) SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error {
+ l.m.Lock()
+ defer l.m.Unlock()
+
+ l.setGeneration(virtualStorage, relativePath, storage, generation)
+
+ return nil
+}
+
+func (l *LocalGenerationStore) DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error {
+ l.m.Lock()
+ defer l.m.Unlock()
+
+ vs, ok := l.generations[virtualStorage]
+ if !ok {
+ return nil
+ }
+
+ rel, ok := vs[relativePath]
+ if !ok {
+ return nil
+ }
+
+ delete(rel, storage)
+
+ return nil
+}
+
+func (l *LocalGenerationStore) EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error {
+ l.m.Lock()
+ defer l.m.Unlock()
+
+ if current := l.getGeneration(virtualStorage, relativePath, storage); current != GenerationUnknown && current >= generation {
+ return errDowngradeAttempted
+ }
+
+ return nil
+}
+
+func (l *LocalGenerationStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
+ storages, ok := l.storages[virtualStorage]
+ if !ok {
+ return nil, errUnknownVirtualStorage
+ }
+
+ outdatedRepos := make(map[string]map[string]int)
+ repositories, ok := l.repositories[virtualStorage]
+ if !ok {
+ return outdatedRepos, nil
+ }
+
+ for relativePath, latestGeneration := range repositories {
+ for _, storage := range storages {
+ if gen := l.getGeneration(virtualStorage, relativePath, storage); gen < latestGeneration {
+ if outdatedRepos[relativePath] == nil {
+ outdatedRepos[relativePath] = make(map[string]int)
+ }
+
+ outdatedRepos[relativePath][storage] = latestGeneration - gen
+ }
+ }
+ }
+
+ return outdatedRepos, nil
+}
+
+func (l *LocalGenerationStore) latestGeneration(virtualStorage, relativePath string) int {
+ vs := l.repositories[virtualStorage]
+ if vs == nil {
+ return GenerationUnknown
+ }
+
+ if latest, ok := vs[relativePath]; ok {
+ return latest
+ }
+
+ return GenerationUnknown
+}
+
+func (l *LocalGenerationStore) getGeneration(virtualStorage, relativePath, storage string) int {
+ vs := l.generations[virtualStorage]
+ if vs == nil {
+ return GenerationUnknown
+ }
+
+ rel := vs[relativePath]
+ if rel == nil {
+ return GenerationUnknown
+ }
+
+ if gen, ok := rel[storage]; ok {
+ return gen
+ }
+
+ return GenerationUnknown
+}
+
+func (l *LocalGenerationStore) setGeneration(virtualStorage, relativePath, storage string, generation int) {
+ if generation > l.latestGeneration(virtualStorage, relativePath) {
+ if l.repositories[virtualStorage] == nil {
+ l.repositories[virtualStorage] = make(map[string]int)
+ }
+ l.repositories[virtualStorage][relativePath] = generation
+ }
+
+ vs := l.generations[virtualStorage]
+ if vs == nil {
+ l.generations[virtualStorage] = map[string]map[string]int{
+ relativePath: {
+ storage: generation,
+ },
+ }
+
+ return
+ }
+
+ rel := vs[relativePath]
+ if rel == nil {
+ vs[relativePath] = map[string]int{
+ storage: generation,
+ }
+
+ return
+ }
+
+ rel[storage] = generation
+}
diff --git a/internal/praefect/datastore/generation_local_test.go b/internal/praefect/datastore/generation_local_test.go
new file mode 100644
index 000000000..1e5043efb
--- /dev/null
+++ b/internal/praefect/datastore/generation_local_test.go
@@ -0,0 +1,174 @@
+package datastore
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestGenerationStore_Local(t *testing.T) {
+ testGenerationStore(t, func(t *testing.T, storages map[string][]string) GenerationStore {
+ return NewLocalGenerationStore(storages)
+ })
+}
+
+func testGenerationStore(t *testing.T, newStore func(t *testing.T, storages map[string][]string) GenerationStore) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ t.Run("IncrementGeneration", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ t.Run("creates a new record", func(t *testing.T) {
+ generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ require.Equal(t, 0, generation)
+ })
+
+ t.Run("increments existing record", func(t *testing.T) {
+ generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ require.Equal(t, 1, generation)
+ })
+ })
+
+ t.Run("SetGeneration", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ t.Run("creates a record", func(t *testing.T) {
+ err := gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1)
+ require.NoError(t, err)
+ })
+
+ t.Run("updates existing record", func(t *testing.T) {
+ err := gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 0)
+ require.NoError(t, err)
+ })
+
+ t.Run("increments stays monotonic", func(t *testing.T) {
+ generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ require.Equal(t, 2, generation)
+ })
+ })
+
+ t.Run("EnsureUpgrade", func(t *testing.T) {
+ t.Run("no previous record allowed", func(t *testing.T) {
+ gs := newStore(t, nil)
+ require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown))
+ require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 0))
+ })
+
+ t.Run("upgrade allowed", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 0))
+ require.NoError(t, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 1))
+ require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown))
+ })
+
+ t.Run("downgrade prevented", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1))
+ require.Equal(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 0))
+ require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown))
+ })
+
+ t.Run("same version prevented", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ require.NoError(t, gs.SetGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1", 1))
+ require.Equal(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", 1))
+ require.Error(t, errDowngradeAttempted, gs.EnsureUpgrade(ctx, "virtual-storage-1", "storage-1", "repository-1", GenerationUnknown))
+ })
+ })
+
+ t.Run("DeleteRecord", func(t *testing.T) {
+ gs := newStore(t, nil)
+
+ t.Run("delete non-existing", func(t *testing.T) {
+ err := gs.DeleteRecord(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ })
+
+ t.Run("delete existing", func(t *testing.T) {
+ generation, err := gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ require.Equal(t, 0, generation)
+
+ err = gs.DeleteRecord(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+
+ generation, err = gs.IncrementGeneration(ctx, "virtual-storage-1", "storage-1", "repository-1")
+ require.NoError(t, err)
+ require.Equal(t, 1, generation)
+ })
+ })
+
+ t.Run("GetOutdatedRepositories", func(t *testing.T) {
+ type state map[string]map[string]map[string]struct {
+ generation int
+ }
+
+ type expected map[string]map[string]int
+
+ for _, tc := range []struct {
+ desc string
+ state state
+ expected map[string]map[string]int
+ }{
+ {
+ "no records in virtual storage",
+ state{"virtual-storage-2": {"storage-1": {"repo-1": {generation: 0}}}},
+ expected{},
+ },
+ {
+ "storages missing records",
+ state{"virtual-storage-1": {"storage-1": {"repo-1": {generation: 0}}}},
+ expected{"repo-1": {"storage-2": 1, "storage-3": 1}},
+ },
+ {
+ "outdated storages",
+ state{"virtual-storage-1": {
+ "storage-1": {"repo-1": {generation: 2}},
+ "storage-2": {"repo-1": {generation: 1}},
+ "storage-3": {"repo-1": {generation: 0}},
+ }},
+ expected{"repo-1": {"storage-2": 1, "storage-3": 2}},
+ },
+ {
+ "all up to date",
+ state{"virtual-storage-1": {
+ "storage-1": {"repo-1": {generation: 3}},
+ "storage-2": {"repo-1": {generation: 3}},
+ "storage-3": {"repo-1": {generation: 3}},
+ }},
+ expected{},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ gs := newStore(t, map[string][]string{"virtual-storage-1": {"storage-1", "storage-2", "storage-3"}})
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for vs, storages := range tc.state {
+ for storage, repos := range storages {
+ for repo, state := range repos {
+ require.NoError(t, gs.SetGeneration(ctx, vs, storage, repo, state.generation))
+ }
+ }
+ }
+
+ outdated, err := gs.GetOutdatedRepositories(ctx, "virtual-storage-1")
+ require.NoError(t, err)
+ require.Equal(t, tc.expected, outdated)
+ })
+ }
+ })
+}
diff --git a/internal/praefect/datastore/generation_postgres.go b/internal/praefect/datastore/generation_postgres.go
new file mode 100644
index 000000000..2f8d28839
--- /dev/null
+++ b/internal/praefect/datastore/generation_postgres.go
@@ -0,0 +1,196 @@
+package datastore
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+
+ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+// GenerationUnknown is used to indicate lack of generation number in
+// a replication job. Older instances can produce replication jobs
+// without a generation number.
+const GenerationUnknown = -1
+
+var (
+ errDowngradeAttempted = errors.New("downgrade attempted")
+ errUnknownVirtualStorage = errors.New("unknown virtual storage")
+)
+
+// GenerationStore provides access to repositoy generation metadata.
+type GenerationStore interface {
+ // IncrementGeneration increments the repository's generation and sets the storage's generation
+ // to match the value.
+ IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error)
+ // SetGeneration sets the repository's generation on the given storage.
+ SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error
+ // EnsureUpgrade returns an error if the given generation would downgrade the repository on the storage.
+ EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error
+ // DeleteRecord deletes a storage's entry for the repository.
+ DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error
+ // GetOutdatedRepositories gets all storage's which have outdated repositories and lists how many
+ // generations they are behind.
+ GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
+}
+
+// PostgresGenerationStore is an in-memory implementation of GenerationStore.
+// Refer to the interface for method documentation.
+type PostgresGenerationStore struct {
+ db glsql.Querier
+ storages map[string][]string
+}
+
+// NewLocalGenerationStore returns a Postgres implementation of GenerationStore.
+func NewPostgresGenerationStore(db glsql.Querier, storages map[string][]string) *PostgresGenerationStore {
+ return &PostgresGenerationStore{db: db, storages: storages}
+}
+
+func (rs *PostgresGenerationStore) IncrementGeneration(ctx context.Context, virtualStorage, storage, relativePath string) (int, error) {
+ const q = `
+WITH next AS (
+ INSERT INTO repository_generations (
+ virtual_storage,
+ relative_path,
+ generation
+ ) VALUES ($1, $2, 0)
+ ON CONFLICT (virtual_storage, relative_path) DO
+ UPDATE SET generation = repository_generations.generation + 1
+ RETURNING virtual_storage, relative_path, generation
+)
+
+INSERT INTO storage_generations (
+ virtual_storage,
+ relative_path,
+ storage,
+ generation
+)
+SELECT virtual_storage, relative_path, $3, generation
+FROM next
+ON CONFLICT (virtual_storage, relative_path, storage) DO
+ UPDATE SET generation = EXCLUDED.generation
+RETURNING generation
+`
+
+ var generation int
+ if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&generation); err != nil {
+ return 0, err
+ }
+
+ return generation, nil
+}
+
+func (rs *PostgresGenerationStore) SetGeneration(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error {
+ const q = `
+WITH repository AS (
+ INSERT INTO repository_generations (
+ virtual_storage,
+ relative_path,
+ generation
+ ) VALUES ($1, $2, $4)
+ ON CONFLICT (virtual_storage, relative_path) DO
+ UPDATE SET generation = EXCLUDED.generation
+ WHERE repository_generations.generation < EXCLUDED.generation
+)
+
+INSERT INTO storage_generations (
+ virtual_storage,
+ relative_path,
+ storage,
+ generation
+)
+VALUES ($1, $2, $3, $4)
+ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET
+ generation = EXCLUDED.generation
+`
+
+ _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation)
+ return err
+}
+
+func (rs *PostgresGenerationStore) EnsureUpgrade(ctx context.Context, virtualStorage, storage, relativePath string, generation int) error {
+ const q = `
+SELECT generation
+FROM storage_generations
+WHERE virtual_storage = $1
+AND relative_path = $2
+AND storage = $3
+`
+ var current int
+ if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&current); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil
+ }
+
+ return err
+ }
+
+ if current >= generation {
+ return errDowngradeAttempted
+ }
+
+ return nil
+}
+
+func (rs *PostgresGenerationStore) DeleteRecord(ctx context.Context, virtualStorage, storage, relativePath string) error {
+ const q = `
+DELETE FROM storage_generations
+WHERE virtual_storage = $1
+AND storage = $2
+AND relative_path = $3
+`
+
+ _, err := rs.db.ExecContext(ctx, q, virtualStorage, storage, relativePath)
+ return err
+}
+
+func (rs *PostgresGenerationStore) GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error) {
+ // As some storages might be missing records from the table, we do a cross join between the repositories
+ // configured storages. If a storage is missing an entry, it is considered fully outdated.
+ const q = `
+WITH repositories AS (
+ SELECT virtual_storage, relative_path, generation
+ FROM repository_generations
+ WHERE virtual_storage = $1
+)
+
+SELECT storages.name, repository.relative_path, repository.generation - COALESCE(storage.generation, -1) AS behind_by
+FROM (SELECT unnest($2::text[]) AS name) AS storages
+CROSS JOIN repositories AS repository
+LEFT JOIN storage_generations AS storage
+ ON storage.virtual_storage = repository.virtual_storage
+ AND storage.relative_path = repository.relative_path
+ AND storage.storage = storages.name
+WHERE COALESCE(storage.generation, -1) < repository.generation
+`
+ storages, ok := rs.storages[virtualStorage]
+ if !ok {
+ return nil, errUnknownVirtualStorage
+ }
+
+ rows, err := rs.db.QueryContext(ctx, q, virtualStorage, pq.StringArray(storages))
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ outdated := make(map[string]map[string]int)
+ for rows.Next() {
+ var storage, relativePath string
+ var difference int
+ if err := rows.Scan(&storage, &relativePath, &difference); err != nil {
+ return nil, err
+ }
+
+ storages := outdated[relativePath]
+ if storages == nil {
+ storages = make(map[string]int)
+ }
+
+ storages[storage] = difference
+ outdated[relativePath] = storages
+ }
+
+ return outdated, rows.Err()
+}
diff --git a/internal/praefect/datastore/generation_postgres_test.go b/internal/praefect/datastore/generation_postgres_test.go
new file mode 100644
index 000000000..9dcba5e27
--- /dev/null
+++ b/internal/praefect/datastore/generation_postgres_test.go
@@ -0,0 +1,13 @@
+// +build postgres
+
+package datastore
+
+import (
+ "testing"
+)
+
+func TestGenerationStore_Postgres(t *testing.T) {
+ testGenerationStore(t, func(t *testing.T, storages map[string][]string) GenerationStore {
+ return NewPostgresGenerationStore(getDB(t), storages)
+ })
+}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 9de73d7b0..968e6ff56 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -58,6 +58,8 @@ func (db DB) TruncateAll(t testing.TB) {
"replication_queue_lock",
"node_status",
"shard_primaries",
+ "repository_generations",
+ "storage_generations",
)
}
diff --git a/internal/praefect/datastore/migrations/20200707101830_repositories_table.go b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go
new file mode 100644
index 000000000..bbe044fd6
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200707101830_repositories_table.go
@@ -0,0 +1,28 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200707101830_repositories_table",
+ Up: []string{`
+CREATE TABLE repository_generations (
+ virtual_storage TEXT,
+ relative_path TEXT,
+ generation BIGINT NOT NULL,
+ PRIMARY KEY (virtual_storage, relative_path)
+);
+
+CREATE TABLE storage_generations (
+ virtual_storage TEXT,
+ relative_path TEXT,
+ storage TEXT,
+ generation BIGINT NOT NULL,
+ PRIMARY KEY (virtual_storage, relative_path, storage),
+ FOREIGN KEY (virtual_storage, relative_path) REFERENCES repository_generations
+);`},
+ Down: []string{"DROP TABLE repository_generations; DROP TABLE storage_generations;"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index d4a41499b..91852492e 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -235,8 +235,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue)
}
+ gs := datastore.NewLocalGenerationStore(conf.StorageNames())
+
coordinator := NewCoordinator(
opt.withQueue,
+ gs,
opt.withNodeMgr,
opt.withTxMgr,
conf,
@@ -248,10 +251,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withLogger,
conf.VirtualStorageNames(),
opt.withQueue,
+ gs,
opt.withNodeMgr,
)
- prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue)
+ prf := NewGRPCServer(conf, opt.withLogger, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, opt.withNodeMgr, opt.withTxMgr, opt.withQueue, gs)
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index cad593bb6..0dadf5738 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -38,6 +38,7 @@ type Replicator interface {
type defaultReplicator struct {
log *logrus.Entry
+ gs datastore.GenerationStore
}
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
@@ -51,6 +52,17 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
RelativePath: event.Job.RelativePath,
}
+ // Old instances might produce replication jobs which do not have a generation number. Such replication jobs
+ // are processed if the target repository does not have a known generation yet.
+ generation := datastore.GenerationUnknown
+ if evtGen, ok := event.Job.Params["generation"]; ok {
+ generation = int(evtGen.(float64))
+ }
+
+ if err := dr.gs.EnsureUpgrade(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, generation); err != nil {
+ return fmt.Errorf("ensure upgrade: %w", err)
+ }
+
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
@@ -101,6 +113,15 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
}).Error("checksums do not match")
}
+ if generation >= 0 {
+ return dr.gs.SetGeneration(ctx,
+ event.Job.VirtualStorage,
+ event.Job.TargetNodeStorage,
+ event.Job.RelativePath,
+ generation,
+ )
+ }
+
return nil
}
@@ -112,6 +133,13 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
+ // This might succeed and the repository removal fail. The repository removal will be reattempted as long
+ // as the replication job is still alive. We'll need an additional field later to separate this case from
+ // a missing repository record and to reattempt the removal even after the original replication job is dead.
+ if err := dr.gs.DeleteRecord(ctx, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath); err != nil {
+ return err
+ }
+
_, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: targetRepo,
})
@@ -294,12 +322,12 @@ func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, gs datastore.GenerationStore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
queue: queue,
whitelist: map[string]struct{}{},
- replicator: defaultReplicator{log},
+ replicator: defaultReplicator{log, gs},
virtualStorages: virtualStorages,
nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 93954bd3a..8668de81d 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -145,8 +145,10 @@ func TestProcessReplicationJob(t *testing.T) {
Message: "a commit",
})
- var replicator defaultReplicator
- replicator.log = entry
+ replicator := defaultReplicator{
+ log: entry,
+ gs: datastore.NewLocalGenerationStore(conf.StorageNames()),
+ }
mockReplicationGauge := promtest.NewMockStorageGauge()
@@ -157,6 +159,7 @@ func TestProcessReplicationJob(t *testing.T) {
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
queue,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
@@ -217,11 +220,13 @@ func TestPropagateReplicationJob(t *testing.T) {
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ gs := datastore.NewLocalGenerationStore(conf.StorageNames())
- replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr)
+ coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue)
+ replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, gs, nodeMgr)
+
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs)
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
@@ -499,7 +504,13 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
+ replMgr := NewReplMgr(
+ logEntry,
+ conf.VirtualStorageNames(),
+ queueInterceptor,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ )
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -639,7 +650,13 @@ func TestProcessBacklog_Success(t *testing.T) {
nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
+ replMgr := NewReplMgr(
+ logEntry,
+ conf.VirtualStorageNames(),
+ queueInterceptor,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ )
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -703,6 +720,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
queue,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
&nodes.MockManager{
GetShardFunc: func(vs string) (nodes.Shard, error) {
require.Equal(t, virtualStorage, vs)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 08f5aac4f..84a9ebfb7 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -42,6 +42,7 @@ func NewGRPCServer(
nodeMgr nodes.Manager,
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
+ gs datastore.GenerationStore,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
ctxTagOpts := []grpc_ctxtags.Option{
@@ -85,7 +86,7 @@ func NewGRPCServer(
warnDupeAddrs(logger, conf)
srv := grpc.NewServer(grpcOpts...)
- registerServices(srv, nodeMgr, txMgr, conf, queue)
+ registerServices(srv, nodeMgr, txMgr, conf, queue, gs)
return srv
}
@@ -97,10 +98,10 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
}
// registerServices registers services praefect needs to handle RPCs on its own.
-func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) {
+func registerServices(srv *grpc.Server, nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue, gs datastore.GenerationStore) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue))
+ gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(nm, conf, queue, gs))
gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv, health.NewServer())
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index bf2172808..d9d800f70 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -25,6 +25,7 @@ func NewServerFactory(
nodeMgr nodes.Manager,
txMgr *transactions.Manager,
queue datastore.ReplicationEventQueue,
+ gs datastore.GenerationStore,
registry *protoregistry.Registry,
) *ServerFactory {
return &ServerFactory{
@@ -34,6 +35,7 @@ func NewServerFactory(
nodeMgr: nodeMgr,
txMgr: txMgr,
queue: queue,
+ gs: gs,
registry: registry,
}
}
@@ -47,6 +49,7 @@ type ServerFactory struct {
nodeMgr nodes.Manager
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
+ gs datastore.GenerationStore
registry *protoregistry.Registry
secure, insecure []*grpc.Server
}
@@ -112,6 +115,7 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
s.nodeMgr,
s.txMgr,
s.queue,
+ s.gs,
grpcOpts...,
)
}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 29f1cf753..4e02120e6 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -82,7 +82,8 @@ func TestServerFactory(t *testing.T) {
require.NoError(t, err)
txMgr := transactions.NewManager()
registry := protoregistry.GitalyProtoPreregistered
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
+ gs := datastore.NewLocalGenerationStore(conf.StorageNames())
+ coordinator := NewCoordinator(queue, gs, nodeMgr, txMgr, conf, registry)
checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient {
t.Helper()
@@ -104,7 +105,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, ":0")
@@ -133,7 +134,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, ":0")
@@ -172,7 +173,7 @@ func TestServerFactory(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry)
defer praefectServerFactory.Stop()
// start with tcp address
@@ -240,7 +241,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -249,7 +250,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, gs, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index f1f4b717b..93b1f4861 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -781,7 +781,14 @@ func TestProxyWrites(t *testing.T) {
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(
+ queue,
+ datastore.NewLocalGenerationStore(conf.StorageNames()),
+ nodeMgr,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
server := grpc.NewServer(
grpc.CustomCodec(proxy.NewCodec()),
diff --git a/internal/praefect/service/info/dataloss.go b/internal/praefect/service/info/dataloss.go
index 9c6a898ba..988e1d1cb 100644
--- a/internal/praefect/service/info/dataloss.go
+++ b/internal/praefect/service/info/dataloss.go
@@ -8,37 +8,38 @@ import (
)
func (s *Server) DatalossCheck(ctx context.Context, req *gitalypb.DatalossCheckRequest) (*gitalypb.DatalossCheckResponse, error) {
- shard, err := s.nodeMgr.GetShard(req.VirtualStorage)
+ shard, err := s.nodeMgr.GetShard(req.GetVirtualStorage())
if err != nil {
return nil, err
}
- if shard.PreviousWritablePrimary == nil {
- return &gitalypb.DatalossCheckResponse{
- CurrentPrimary: shard.Primary.GetStorage(),
- IsReadOnly: shard.IsReadOnly,
- }, nil
- }
-
- repos, err := s.queue.GetOutdatedRepositories(ctx, req.GetVirtualStorage(), shard.PreviousWritablePrimary.GetStorage())
+ outdatedRepos, err := s.gs.GetOutdatedRepositories(ctx, req.GetVirtualStorage())
if err != nil {
return nil, err
}
- outdatedNodes := make([]*gitalypb.DatalossCheckResponse_Nodes, 0, len(repos))
- for repo, nodes := range repos {
- outdatedNodes = append(outdatedNodes, &gitalypb.DatalossCheckResponse_Nodes{
- RelativePath: repo,
- Nodes: nodes,
+ pbRepos := make([]*gitalypb.DatalossCheckResponse_Repository, 0, len(outdatedRepos))
+ for relativePath, storages := range outdatedRepos {
+ pbStorages := make([]*gitalypb.DatalossCheckResponse_Repository_Storage, 0, len(storages))
+ for name, behindBy := range storages {
+ pbStorages = append(pbStorages, &gitalypb.DatalossCheckResponse_Repository_Storage{
+ Name: name,
+ BehindBy: int64(behindBy),
+ })
+ }
+
+ sort.Slice(pbStorages, func(i, j int) bool { return pbStorages[i].Name < pbStorages[j].Name })
+
+ pbRepos = append(pbRepos, &gitalypb.DatalossCheckResponse_Repository{
+ RelativePath: relativePath,
+ Storages: pbStorages,
})
}
- sort.Slice(outdatedNodes, func(i, j int) bool { return outdatedNodes[i].RelativePath < outdatedNodes[j].RelativePath })
+ sort.Slice(pbRepos, func(i, j int) bool { return pbRepos[i].RelativePath < pbRepos[j].RelativePath })
return &gitalypb.DatalossCheckResponse{
- PreviousWritablePrimary: shard.PreviousWritablePrimary.GetStorage(),
- CurrentPrimary: shard.Primary.GetStorage(),
- IsReadOnly: shard.IsReadOnly,
- OutdatedNodes: outdatedNodes,
+ Primary: shard.Primary.GetStorage(),
+ Repositories: pbRepos,
}, nil
}
diff --git a/internal/praefect/service/info/dataloss_test.go b/internal/praefect/service/info/dataloss_test.go
deleted file mode 100644
index 34a51dc45..000000000
--- a/internal/praefect/service/info/dataloss_test.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package info
-
-import (
- "context"
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
-)
-
-func TestDatalossCheck(t *testing.T) {
- for _, tc := range []struct {
- desc string
- shard nodes.Shard
- outdated map[string][]string
- response *gitalypb.DatalossCheckResponse
- error error
- }{
- {
- desc: "no previous writable primary",
- shard: nodes.Shard{
- Primary: &nodes.MockNode{StorageName: "primary-storage"},
- },
- response: &gitalypb.DatalossCheckResponse{
- CurrentPrimary: "primary-storage",
- },
- },
- {
- desc: "multiple out of date",
- shard: nodes.Shard{
- PreviousWritablePrimary: &nodes.MockNode{StorageName: "previous-primary"},
- Primary: &nodes.MockNode{StorageName: "primary-storage"},
- IsReadOnly: true,
- },
- outdated: map[string][]string{
- "repo-2": {"node-3"},
- "repo-1": {"node-1", "node-2"},
- },
- response: &gitalypb.DatalossCheckResponse{
- PreviousWritablePrimary: "previous-primary",
- CurrentPrimary: "primary-storage",
- IsReadOnly: true,
- OutdatedNodes: []*gitalypb.DatalossCheckResponse_Nodes{
- {RelativePath: "repo-1", Nodes: []string{"node-1", "node-2"}},
- {RelativePath: "repo-2", Nodes: []string{"node-3"}},
- },
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- const virtualStorage = "test-virtual-storage"
- mgr := &nodes.MockManager{
- GetShardFunc: func(vs string) (nodes.Shard, error) {
- require.Equal(t, virtualStorage, vs)
- return tc.shard, nil
- },
- }
-
- rq := &datastore.MockReplicationEventQueue{
- GetOutdatedRepositoriesFunc: func(ctx context.Context, virtualStorage string, referenceStorage string) (map[string][]string, error) {
- return tc.outdated, nil
- },
- }
-
- srv := NewServer(mgr, config.Config{}, rq)
- resp, err := srv.DatalossCheck(context.Background(), &gitalypb.DatalossCheckRequest{
- VirtualStorage: virtualStorage,
- })
- require.Equal(t, tc.error, err)
- require.Equal(t, tc.response, resp)
- })
- }
-}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index f46c2fd03..55c7b7bc4 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -11,23 +11,27 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+type GenerationStore interface {
+ GetOutdatedRepositories(ctx context.Context, virtualStorage string) (map[string]map[string]int, error)
+}
+
// Server is a InfoService server
type Server struct {
gitalypb.UnimplementedPraefectInfoServiceServer
nodeMgr nodes.Manager
conf config.Config
+ gs GenerationStore
queue datastore.ReplicationEventQueue
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue) gitalypb.PraefectInfoServiceServer {
- s := &Server{
+func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue, gs GenerationStore) gitalypb.PraefectInfoServiceServer {
+ return &Server{
nodeMgr: nodeMgr,
conf: conf,
+ gs: gs,
queue: queue,
}
-
- return s
}
func (s *Server) EnableWrites(ctx context.Context, req *gitalypb.EnableWritesRequest) (*gitalypb.EnableWritesResponse, error) {
diff --git a/internal/service/remote/fetch_internal_remote.go b/internal/service/remote/fetch_internal_remote.go
index e57ff6d66..229680add 100644
--- a/internal/service/remote/fetch_internal_remote.go
+++ b/internal/service/remote/fetch_internal_remote.go
@@ -1,7 +1,6 @@
package remote
import (
- "bytes"
"context"
"fmt"
@@ -9,7 +8,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/gitalyssh"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/service/ref"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -53,7 +51,7 @@ func (s *server) FetchInternalRemote(ctx context.Context, req *gitalypb.FetchInt
return &gitalypb.FetchInternalRemoteResponse{Result: false}, nil
}
- remoteDefaultBranch, err := ref.DefaultBranchName(ctx, req.RemoteRepository)
+ /*remoteDefaultBranch, err := ref.DefaultBranchName(ctx, req.RemoteRepository)
if err != nil {
return nil, status.Errorf(codes.Internal, "FetchInternalRemote: remote default branch: %v", err)
}
@@ -67,7 +65,7 @@ func (s *server) FetchInternalRemote(ctx context.Context, req *gitalypb.FetchInt
if err := ref.SetDefaultBranchRef(ctx, req.Repository, string(remoteDefaultBranch)); err != nil {
return nil, status.Errorf(codes.Internal, "FetchInternalRemote: set default branch: %v", err)
}
- }
+ }*/
return &gitalypb.FetchInternalRemoteResponse{Result: true}, nil
}
diff --git a/proto/go/gitalypb/praefect.pb.go b/proto/go/gitalypb/praefect.pb.go
index 3a16715d1..45d4cd8a4 100644
--- a/proto/go/gitalypb/praefect.pb.go
+++ b/proto/go/gitalypb/praefect.pb.go
@@ -136,15 +136,13 @@ func (m *DatalossCheckRequest) GetVirtualStorage() string {
}
type DatalossCheckResponse struct {
- VirtualStorage string `protobuf:"bytes,1,opt,name=virtual_storage,json=virtualStorage,proto3" json:"virtual_storage,omitempty"`
- PreviousWritablePrimary string `protobuf:"bytes,2,opt,name=previous_writable_primary,json=previousWritablePrimary,proto3" json:"previous_writable_primary,omitempty"`
- CurrentPrimary string `protobuf:"bytes,3,opt,name=current_primary,json=currentPrimary,proto3" json:"current_primary,omitempty"`
- // whether the virtual storage is currently in read-only mode
- IsReadOnly bool `protobuf:"varint,4,opt,name=is_read_only,json=isReadOnly,proto3" json:"is_read_only,omitempty"`
- OutdatedNodes []*DatalossCheckResponse_Nodes `protobuf:"bytes,5,rep,name=outdated_nodes,json=outdatedNodes,proto3" json:"outdated_nodes,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
+ // current primary storage
+ Primary string `protobuf:"bytes,1,opt,name=primary,proto3" json:"primary,omitempty"`
+ // repositories with data loss
+ Repositories []*DatalossCheckResponse_Repository `protobuf:"bytes,2,rep,name=repositories,proto3" json:"repositories,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
}
func (m *DatalossCheckResponse) Reset() { *m = DatalossCheckResponse{} }
@@ -172,88 +170,118 @@ func (m *DatalossCheckResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_DatalossCheckResponse proto.InternalMessageInfo
-func (m *DatalossCheckResponse) GetVirtualStorage() string {
+func (m *DatalossCheckResponse) GetPrimary() string {
if m != nil {
- return m.VirtualStorage
+ return m.Primary
}
return ""
}
-func (m *DatalossCheckResponse) GetPreviousWritablePrimary() string {
+func (m *DatalossCheckResponse) GetRepositories() []*DatalossCheckResponse_Repository {
if m != nil {
- return m.PreviousWritablePrimary
+ return m.Repositories
}
- return ""
+ return nil
}
-func (m *DatalossCheckResponse) GetCurrentPrimary() string {
- if m != nil {
- return m.CurrentPrimary
- }
- return ""
+type DatalossCheckResponse_Repository struct {
+ // relative path of the repository with outdated replicas
+ RelativePath string `protobuf:"bytes,1,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"`
+ // storages on which the repository is outdated
+ Storages []*DatalossCheckResponse_Repository_Storage `protobuf:"bytes,2,rep,name=storages,proto3" json:"storages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *DatalossCheckResponse_Repository) Reset() { *m = DatalossCheckResponse_Repository{} }
+func (m *DatalossCheckResponse_Repository) String() string { return proto.CompactTextString(m) }
+func (*DatalossCheckResponse_Repository) ProtoMessage() {}
+func (*DatalossCheckResponse_Repository) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{3, 0}
}
-func (m *DatalossCheckResponse) GetIsReadOnly() bool {
+func (m *DatalossCheckResponse_Repository) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_DatalossCheckResponse_Repository.Unmarshal(m, b)
+}
+func (m *DatalossCheckResponse_Repository) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_DatalossCheckResponse_Repository.Marshal(b, m, deterministic)
+}
+func (m *DatalossCheckResponse_Repository) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_DatalossCheckResponse_Repository.Merge(m, src)
+}
+func (m *DatalossCheckResponse_Repository) XXX_Size() int {
+ return xxx_messageInfo_DatalossCheckResponse_Repository.Size(m)
+}
+func (m *DatalossCheckResponse_Repository) XXX_DiscardUnknown() {
+ xxx_messageInfo_DatalossCheckResponse_Repository.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_DatalossCheckResponse_Repository proto.InternalMessageInfo
+
+func (m *DatalossCheckResponse_Repository) GetRelativePath() string {
if m != nil {
- return m.IsReadOnly
+ return m.RelativePath
}
- return false
+ return ""
}
-func (m *DatalossCheckResponse) GetOutdatedNodes() []*DatalossCheckResponse_Nodes {
+func (m *DatalossCheckResponse_Repository) GetStorages() []*DatalossCheckResponse_Repository_Storage {
if m != nil {
- return m.OutdatedNodes
+ return m.Storages
}
return nil
}
-type DatalossCheckResponse_Nodes struct {
- // relative path of the repository with outdated nodes
- RelativePath string `protobuf:"bytes,1,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"`
- // nodes whose copy of the repository is not up to date
- Nodes []string `protobuf:"bytes,2,rep,name=nodes,proto3" json:"nodes,omitempty"`
+type DatalossCheckResponse_Repository_Storage struct {
+ // name of the storage
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ // behind_by indicates how many generations this storage is behind.
+ BehindBy int64 `protobuf:"varint,2,opt,name=behind_by,json=behindBy,proto3" json:"behind_by,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
-func (m *DatalossCheckResponse_Nodes) Reset() { *m = DatalossCheckResponse_Nodes{} }
-func (m *DatalossCheckResponse_Nodes) String() string { return proto.CompactTextString(m) }
-func (*DatalossCheckResponse_Nodes) ProtoMessage() {}
-func (*DatalossCheckResponse_Nodes) Descriptor() ([]byte, []int) {
- return fileDescriptor_d32bf44842ead735, []int{3, 0}
+func (m *DatalossCheckResponse_Repository_Storage) Reset() {
+ *m = DatalossCheckResponse_Repository_Storage{}
+}
+func (m *DatalossCheckResponse_Repository_Storage) String() string { return proto.CompactTextString(m) }
+func (*DatalossCheckResponse_Repository_Storage) ProtoMessage() {}
+func (*DatalossCheckResponse_Repository_Storage) Descriptor() ([]byte, []int) {
+ return fileDescriptor_d32bf44842ead735, []int{3, 0, 0}
}
-func (m *DatalossCheckResponse_Nodes) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_DatalossCheckResponse_Nodes.Unmarshal(m, b)
+func (m *DatalossCheckResponse_Repository_Storage) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Unmarshal(m, b)
}
-func (m *DatalossCheckResponse_Nodes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_DatalossCheckResponse_Nodes.Marshal(b, m, deterministic)
+func (m *DatalossCheckResponse_Repository_Storage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Marshal(b, m, deterministic)
}
-func (m *DatalossCheckResponse_Nodes) XXX_Merge(src proto.Message) {
- xxx_messageInfo_DatalossCheckResponse_Nodes.Merge(m, src)
+func (m *DatalossCheckResponse_Repository_Storage) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Merge(m, src)
}
-func (m *DatalossCheckResponse_Nodes) XXX_Size() int {
- return xxx_messageInfo_DatalossCheckResponse_Nodes.Size(m)
+func (m *DatalossCheckResponse_Repository_Storage) XXX_Size() int {
+ return xxx_messageInfo_DatalossCheckResponse_Repository_Storage.Size(m)
}
-func (m *DatalossCheckResponse_Nodes) XXX_DiscardUnknown() {
- xxx_messageInfo_DatalossCheckResponse_Nodes.DiscardUnknown(m)
+func (m *DatalossCheckResponse_Repository_Storage) XXX_DiscardUnknown() {
+ xxx_messageInfo_DatalossCheckResponse_Repository_Storage.DiscardUnknown(m)
}
-var xxx_messageInfo_DatalossCheckResponse_Nodes proto.InternalMessageInfo
+var xxx_messageInfo_DatalossCheckResponse_Repository_Storage proto.InternalMessageInfo
-func (m *DatalossCheckResponse_Nodes) GetRelativePath() string {
+func (m *DatalossCheckResponse_Repository_Storage) GetName() string {
if m != nil {
- return m.RelativePath
+ return m.Name
}
return ""
}
-func (m *DatalossCheckResponse_Nodes) GetNodes() []string {
+func (m *DatalossCheckResponse_Repository_Storage) GetBehindBy() int64 {
if m != nil {
- return m.Nodes
+ return m.BehindBy
}
- return nil
+ return 0
}
type RepositoryReplicasRequest struct {
@@ -542,7 +570,8 @@ func init() {
proto.RegisterType((*EnableWritesResponse)(nil), "gitaly.EnableWritesResponse")
proto.RegisterType((*DatalossCheckRequest)(nil), "gitaly.DatalossCheckRequest")
proto.RegisterType((*DatalossCheckResponse)(nil), "gitaly.DatalossCheckResponse")
- proto.RegisterType((*DatalossCheckResponse_Nodes)(nil), "gitaly.DatalossCheckResponse.Nodes")
+ proto.RegisterType((*DatalossCheckResponse_Repository)(nil), "gitaly.DatalossCheckResponse.Repository")
+ proto.RegisterType((*DatalossCheckResponse_Repository_Storage)(nil), "gitaly.DatalossCheckResponse.Repository.Storage")
proto.RegisterType((*RepositoryReplicasRequest)(nil), "gitaly.RepositoryReplicasRequest")
proto.RegisterType((*RepositoryReplicasResponse)(nil), "gitaly.RepositoryReplicasResponse")
proto.RegisterType((*RepositoryReplicasResponse_RepositoryDetails)(nil), "gitaly.RepositoryReplicasResponse.RepositoryDetails")
@@ -553,54 +582,52 @@ func init() {
func init() { proto.RegisterFile("praefect.proto", fileDescriptor_d32bf44842ead735) }
var fileDescriptor_d32bf44842ead735 = []byte{
- // 741 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xeb, 0x44,
- 0x14, 0x96, 0xf3, 0x73, 0x49, 0x4f, 0x93, 0xdc, 0xdc, 0xb9, 0xa5, 0x4d, 0x4d, 0xa1, 0xc1, 0x15,
- 0x6a, 0x24, 0xda, 0xa4, 0x0a, 0x48, 0x48, 0x2c, 0xfb, 0xb3, 0x68, 0x17, 0x6d, 0xe4, 0x4a, 0x54,
- 0x62, 0x63, 0x4d, 0xec, 0x93, 0x64, 0xc0, 0xf1, 0x98, 0x99, 0x49, 0x50, 0xde, 0x80, 0x05, 0x7b,
- 0x1e, 0x80, 0x77, 0xe1, 0x0d, 0x10, 0x4f, 0xc1, 0x0b, 0xb0, 0x42, 0xf6, 0x8c, 0x4d, 0x92, 0x3a,
- 0x45, 0xea, 0xdd, 0xf9, 0x9c, 0xef, 0x3b, 0xdf, 0x99, 0xf3, 0xe3, 0x19, 0x68, 0xc6, 0x82, 0xe2,
- 0x18, 0x7d, 0xd5, 0x8b, 0x05, 0x57, 0x9c, 0xbc, 0x99, 0x30, 0x45, 0xc3, 0xa5, 0x0d, 0x21, 0x8b,
- 0x8c, 0xcf, 0xae, 0xcb, 0x29, 0x15, 0x18, 0x18, 0xeb, 0x78, 0xc2, 0xf9, 0x24, 0xc4, 0x7e, 0x6a,
- 0x8d, 0xe6, 0xe3, 0xbe, 0x62, 0x33, 0x94, 0x8a, 0xce, 0x62, 0x4d, 0x70, 0xae, 0xe1, 0xfd, 0x4d,
- 0x44, 0x47, 0x21, 0x3e, 0x09, 0xa6, 0x50, 0xba, 0xf8, 0xd3, 0x1c, 0xa5, 0x22, 0xe7, 0xf0, 0x76,
- 0xc1, 0x84, 0x9a, 0xd3, 0xd0, 0x93, 0x8a, 0x0b, 0x3a, 0xc1, 0xb6, 0xd5, 0xb1, 0xba, 0x3b, 0x97,
- 0x95, 0x5f, 0xfe, 0x38, 0xb3, 0xdc, 0xa6, 0x01, 0x1f, 0x35, 0xe6, 0xec, 0xc3, 0xde, 0xba, 0x8a,
- 0x8c, 0x79, 0x24, 0xd1, 0xb9, 0x81, 0xbd, 0x6b, 0xaa, 0x68, 0xc8, 0xa5, 0xbc, 0x9a, 0xa2, 0xff,
- 0xe3, 0x2b, 0xe5, 0xff, 0x2a, 0xc1, 0xc7, 0x1b, 0x3a, 0x3a, 0x01, 0x39, 0xdd, 0x22, 0xb4, 0x29,
- 0x41, 0xbe, 0x85, 0xc3, 0x58, 0xe0, 0x82, 0xf1, 0xb9, 0xf4, 0x7e, 0x16, 0x4c, 0x25, 0x87, 0xf5,
- 0x62, 0xc1, 0x66, 0x54, 0x2c, 0xdb, 0xa5, 0x34, 0xe4, 0x20, 0x23, 0x3c, 0x19, 0x7c, 0xa8, 0xe1,
- 0x24, 0x89, 0x3f, 0x17, 0x02, 0x23, 0x95, 0x47, 0x94, 0x75, 0x12, 0xe3, 0xce, 0x88, 0x1d, 0xa8,
- 0x33, 0xe9, 0x09, 0xa4, 0x81, 0xc7, 0xa3, 0x70, 0xd9, 0xae, 0x74, 0xac, 0x6e, 0xcd, 0x05, 0x26,
- 0x5d, 0xa4, 0xc1, 0x43, 0x14, 0x2e, 0xc9, 0x1d, 0x34, 0xf9, 0x5c, 0x05, 0x54, 0x61, 0xe0, 0x45,
- 0x3c, 0x40, 0xd9, 0xae, 0x76, 0xca, 0xdd, 0xdd, 0xc1, 0x49, 0x4f, 0x8f, 0xb2, 0x57, 0x58, 0x66,
- 0xef, 0x3e, 0xa1, 0xba, 0x8d, 0x2c, 0x34, 0x35, 0xed, 0x4b, 0xa8, 0xa6, 0x1f, 0xe4, 0x04, 0x1a,
- 0x02, 0x43, 0xaa, 0xd8, 0x02, 0xbd, 0x98, 0xaa, 0xa9, 0x69, 0x41, 0x3d, 0x73, 0x0e, 0xa9, 0x9a,
- 0x92, 0x3d, 0xa8, 0xea, 0x84, 0xa5, 0x4e, 0xb9, 0xbb, 0xe3, 0x6a, 0xc3, 0x79, 0x80, 0x43, 0x17,
- 0x63, 0x2e, 0x99, 0xe2, 0x62, 0xe9, 0x62, 0x1c, 0x32, 0x9f, 0xe6, 0x4b, 0x30, 0x00, 0x10, 0x39,
- 0x98, 0x8a, 0xee, 0x0e, 0x48, 0x76, 0xd0, 0x95, 0xb0, 0x15, 0x96, 0xf3, 0x7b, 0x09, 0xec, 0x22,
- 0x45, 0x33, 0xaf, 0x7b, 0xf8, 0x28, 0x6b, 0xa1, 0xd6, 0xfb, 0xba, 0x40, 0x6f, 0x23, 0x68, 0x05,
- 0xba, 0x46, 0x45, 0x59, 0x28, 0xdd, 0x4c, 0x84, 0x0c, 0xa1, 0x26, 0x0c, 0x3d, 0x2d, 0xec, 0xb5,
- 0x82, 0xb9, 0x8a, 0xed, 0xc3, 0xbb, 0x67, 0xf0, 0x6b, 0x3a, 0x41, 0x6c, 0xa8, 0xf9, 0xc9, 0x10,
- 0xe5, 0x7c, 0x66, 0x16, 0x2c, 0xb7, 0x9d, 0x3f, 0x2d, 0x38, 0xb8, 0xe2, 0x91, 0x64, 0x52, 0x61,
- 0xe4, 0x2f, 0x3f, 0xe0, 0xdf, 0x20, 0x5f, 0x40, 0x53, 0x51, 0x31, 0x41, 0x95, 0xb3, 0x75, 0xb2,
- 0x86, 0xf6, 0x66, 0xb4, 0x2f, 0xe1, 0x9d, 0xc0, 0x31, 0x0a, 0x8c, 0x7c, 0xcc, 0x99, 0x7a, 0x8b,
- 0x5b, 0x39, 0x90, 0x91, 0xbf, 0x81, 0x83, 0x80, 0xc9, 0xf4, 0x17, 0x11, 0xe8, 0xf3, 0xc8, 0x67,
- 0x61, 0xc8, 0xa8, 0x62, 0x3c, 0x32, 0x2b, 0xbd, 0x6f, 0x60, 0x77, 0x1d, 0x75, 0xfe, 0xb6, 0xa0,
- 0xfd, 0xbc, 0x2e, 0x33, 0xfb, 0x33, 0x20, 0x49, 0x7b, 0xbc, 0xa2, 0x5d, 0x6d, 0x25, 0x88, 0xbb,
- 0xba, 0xaf, 0xa7, 0xf0, 0xd6, 0xd4, 0xb5, 0xd1, 0x45, 0x53, 0xee, 0x95, 0xf1, 0x92, 0xf3, 0x44,
- 0x36, 0xab, 0x2c, 0xe7, 0xea, 0xd2, 0xfe, 0xab, 0x39, 0xa7, 0x7f, 0x06, 0xbb, 0xc9, 0xac, 0xbd,
- 0x1f, 0xf8, 0xc8, 0x63, 0x41, 0x5a, 0x4f, 0xc5, 0xdd, 0x49, 0x5c, 0x77, 0x7c, 0x74, 0x1b, 0x14,
- 0x37, 0xaa, 0x5a, 0xdc, 0xa8, 0xc1, 0xaf, 0x65, 0x78, 0x3f, 0x34, 0x77, 0xf2, 0x6d, 0x34, 0xe6,
- 0x8f, 0x28, 0x16, 0xcc, 0x47, 0x82, 0x40, 0x9e, 0xaf, 0x1f, 0xf9, 0xfc, 0xa5, 0xd5, 0x4c, 0x87,
- 0x6f, 0x3b, 0xff, 0xbf, 0xbd, 0x4e, 0xed, 0x9f, 0xdf, 0xba, 0x95, 0x5a, 0xa9, 0x65, 0x11, 0x0a,
- 0xad, 0xcd, 0x6e, 0x93, 0xe3, 0x4c, 0x61, 0xcb, 0x7e, 0xd9, 0x9d, 0xed, 0x84, 0x8d, 0x04, 0xa5,
- 0x0b, 0x8b, 0x7c, 0x07, 0x8d, 0xb5, 0x2b, 0x89, 0x1c, 0x6d, 0xb9, 0xa9, 0xb4, 0xf8, 0xa7, 0x2f,
- 0xde, 0x63, 0x2b, 0x47, 0x7f, 0x84, 0xfa, 0xea, 0x8b, 0x41, 0x3e, 0xc9, 0x02, 0x0b, 0x5e, 0x23,
- 0xfb, 0xa8, 0x18, 0x5c, 0x13, 0xb5, 0x5a, 0xa5, 0xcb, 0x8b, 0xef, 0x13, 0x62, 0x48, 0x47, 0x3d,
- 0x9f, 0xcf, 0xfa, 0xfa, 0xf3, 0x9c, 0x8b, 0x49, 0x5f, 0x87, 0xeb, 0x57, 0xb0, 0x3f, 0xe1, 0xc6,
- 0x8e, 0x47, 0xa3, 0x37, 0xa9, 0xeb, 0xab, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x90, 0xeb, 0x26,
- 0x4c, 0x5a, 0x07, 0x00, 0x00,
+ // 711 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xd3, 0x4c,
+ 0x14, 0x95, 0x93, 0x7c, 0x6d, 0x72, 0x93, 0xb6, 0xe9, 0xb4, 0x5f, 0x1b, 0xdc, 0x42, 0x83, 0x11,
+ 0x22, 0x12, 0x6d, 0x52, 0x05, 0x24, 0x24, 0x96, 0xfd, 0x59, 0x14, 0x55, 0x50, 0xb9, 0x12, 0x48,
+ 0x6c, 0xa2, 0xb1, 0x73, 0x93, 0x0c, 0x38, 0x1e, 0x33, 0x33, 0xa9, 0x94, 0x37, 0x60, 0xc1, 0x9e,
+ 0x07, 0xe0, 0x31, 0x10, 0x5b, 0xde, 0x80, 0xc7, 0xe0, 0x05, 0x58, 0x21, 0x67, 0xc6, 0x6e, 0x92,
+ 0xba, 0x05, 0x95, 0x9d, 0xe7, 0xde, 0x73, 0xcf, 0xf5, 0x39, 0x77, 0x7e, 0x60, 0x39, 0x12, 0x14,
+ 0x7b, 0xe8, 0xab, 0x66, 0x24, 0xb8, 0xe2, 0x64, 0xa1, 0xcf, 0x14, 0x0d, 0xc6, 0x36, 0x04, 0x2c,
+ 0x34, 0x31, 0xbb, 0x22, 0x07, 0x54, 0x60, 0xd7, 0xac, 0x76, 0xfa, 0x9c, 0xf7, 0x03, 0x6c, 0x4d,
+ 0x56, 0xde, 0xa8, 0xd7, 0x52, 0x6c, 0x88, 0x52, 0xd1, 0x61, 0xa4, 0x01, 0xce, 0x11, 0xac, 0x1d,
+ 0x87, 0xd4, 0x0b, 0xf0, 0x8d, 0x60, 0x0a, 0xa5, 0x8b, 0x1f, 0x46, 0x28, 0x15, 0xd9, 0x83, 0x95,
+ 0x0b, 0x26, 0xd4, 0x88, 0x06, 0x1d, 0xa9, 0xb8, 0xa0, 0x7d, 0xac, 0x59, 0x75, 0xab, 0x51, 0x3a,
+ 0x28, 0x7c, 0xfc, 0xbe, 0x6b, 0xb9, 0xcb, 0x26, 0x79, 0xae, 0x73, 0xce, 0x06, 0xac, 0xcf, 0xb2,
+ 0xc8, 0x88, 0x87, 0x12, 0x9d, 0x63, 0x58, 0x3f, 0xa2, 0x8a, 0x06, 0x5c, 0xca, 0xc3, 0x01, 0xfa,
+ 0xef, 0x6f, 0x49, 0xff, 0x2d, 0x07, 0xff, 0xcf, 0xf1, 0xe8, 0x06, 0xa4, 0x06, 0x8b, 0x91, 0x60,
+ 0x43, 0x2a, 0xc6, 0x9a, 0xc0, 0x4d, 0x96, 0xe4, 0x14, 0x2a, 0x02, 0x23, 0x2e, 0x99, 0xe2, 0x82,
+ 0xa1, 0xac, 0xe5, 0xea, 0xf9, 0x46, 0xb9, 0xdd, 0x68, 0x6a, 0xcb, 0x9a, 0x99, 0x74, 0x4d, 0x37,
+ 0xa9, 0x18, 0xbb, 0x33, 0xd5, 0xf6, 0x57, 0x0b, 0xe0, 0x32, 0x49, 0x1e, 0xc0, 0x92, 0xc0, 0x80,
+ 0x2a, 0x76, 0x81, 0x9d, 0x88, 0xaa, 0x81, 0x69, 0x5e, 0x49, 0x82, 0x67, 0x54, 0x0d, 0xc8, 0x29,
+ 0x14, 0x8d, 0xb8, 0xa4, 0xfb, 0xfe, 0xdf, 0x76, 0x6f, 0x1a, 0xe5, 0x6e, 0xca, 0x60, 0x3f, 0x87,
+ 0x45, 0x13, 0x24, 0x04, 0x0a, 0x21, 0x1d, 0x1a, 0xcb, 0xdc, 0xc9, 0x37, 0xd9, 0x82, 0x92, 0x87,
+ 0x03, 0x16, 0x76, 0x3b, 0xde, 0xb8, 0x96, 0xab, 0x5b, 0x8d, 0xbc, 0x5b, 0xd4, 0x81, 0x83, 0xb1,
+ 0xf3, 0x0a, 0xee, 0x4c, 0x29, 0xc3, 0x28, 0x60, 0x3e, 0x4d, 0x47, 0xdd, 0x06, 0x48, 0xa5, 0x6a,
+ 0x17, 0xcb, 0x6d, 0x92, 0xfc, 0xe8, 0x54, 0xd9, 0x14, 0xca, 0xf9, 0x92, 0x03, 0x3b, 0x8b, 0xd1,
+ 0x4c, 0xe5, 0xe5, 0xec, 0x54, 0xca, 0xed, 0xa7, 0x19, 0x7c, 0x73, 0x45, 0x53, 0xa9, 0x23, 0x54,
+ 0x94, 0x05, 0xf2, 0x72, 0x96, 0x67, 0x50, 0x14, 0x06, 0x6e, 0x9c, 0xbc, 0x1d, 0x61, 0xca, 0x62,
+ 0xfb, 0xb0, 0x7a, 0x25, 0x7d, 0x1b, 0x27, 0x88, 0x0d, 0x45, 0x3f, 0x1e, 0xa2, 0x1c, 0x0d, 0x27,
+ 0xb6, 0x97, 0xdc, 0x74, 0xed, 0xfc, 0xb0, 0x60, 0xf3, 0x90, 0x87, 0x92, 0x49, 0x85, 0xa1, 0x3f,
+ 0xfe, 0x87, 0x13, 0x40, 0x1e, 0xc2, 0xb2, 0xa2, 0xa2, 0x8f, 0x2a, 0x45, 0xeb, 0x66, 0x4b, 0x3a,
+ 0x9a, 0xc0, 0x1e, 0xc3, 0xaa, 0xc0, 0x1e, 0x0a, 0x0c, 0x7d, 0x4c, 0x91, 0xf9, 0x09, 0xb2, 0x9a,
+ 0x26, 0x12, 0xf0, 0x33, 0xd8, 0xec, 0x32, 0x19, 0x9f, 0xda, 0x8e, 0x40, 0x9f, 0x87, 0x3e, 0x0b,
+ 0x02, 0x46, 0x15, 0xe3, 0x61, 0xad, 0x50, 0xb7, 0x1a, 0x45, 0x77, 0xc3, 0xa4, 0xdd, 0xd9, 0xac,
+ 0xf3, 0xd3, 0x82, 0xda, 0x55, 0x5d, 0x66, 0xf6, 0xbb, 0x40, 0x62, 0x7b, 0x3a, 0x59, 0xe7, 0xa3,
+ 0x1a, 0x67, 0xdc, 0xe9, 0x33, 0xf2, 0x08, 0x56, 0x8c, 0xae, 0x39, 0x17, 0x8d, 0xdc, 0x43, 0x13,
+ 0x25, 0x7b, 0x31, 0x6d, 0xa2, 0x2c, 0xc5, 0x6a, 0x69, 0x97, 0x9a, 0x53, 0xf8, 0x3d, 0x28, 0xc7,
+ 0xb3, 0xee, 0xbc, 0xe3, 0x5e, 0x87, 0x75, 0x27, 0x7a, 0x0a, 0x6e, 0x29, 0x0e, 0xbd, 0xe0, 0xde,
+ 0x49, 0x37, 0xdb, 0xa8, 0xff, 0xb2, 0x8d, 0x6a, 0x7f, 0xca, 0xc3, 0xda, 0x99, 0xb9, 0x79, 0x4f,
+ 0xc2, 0x1e, 0x3f, 0x47, 0x71, 0xc1, 0x7c, 0x24, 0x08, 0xe4, 0xea, 0xf6, 0x23, 0xf7, 0x6f, 0xda,
+ 0x9a, 0x93, 0xe1, 0xdb, 0xce, 0x9f, 0x77, 0xaf, 0x53, 0xfc, 0xf5, 0xb9, 0x51, 0x28, 0xe6, 0xaa,
+ 0x16, 0xa1, 0x50, 0x9d, 0x77, 0x9b, 0xec, 0x24, 0x0c, 0xd7, 0xec, 0x2f, 0xbb, 0x7e, 0x3d, 0x60,
+ 0xae, 0x41, 0x6e, 0xdf, 0x22, 0xaf, 0x61, 0x69, 0xe6, 0x4a, 0x22, 0xdb, 0xd7, 0xdc, 0x54, 0x9a,
+ 0xfc, 0xee, 0x8d, 0xf7, 0xd8, 0xd4, 0xaf, 0x9f, 0x43, 0x65, 0xfa, 0x5d, 0x20, 0x5b, 0x49, 0x61,
+ 0xc6, 0x9b, 0x63, 0x6f, 0x67, 0x27, 0x67, 0x48, 0xad, 0x6a, 0xee, 0x60, 0xff, 0x6d, 0x0c, 0x0c,
+ 0xa8, 0xd7, 0xf4, 0xf9, 0xb0, 0xa5, 0x3f, 0xf7, 0xb8, 0xe8, 0xb7, 0x74, 0xb9, 0x7e, 0xeb, 0x5a,
+ 0x7d, 0x6e, 0xd6, 0x91, 0xe7, 0x2d, 0x4c, 0x42, 0x4f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0x11,
+ 0x72, 0xaa, 0x48, 0x40, 0x07, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
diff --git a/proto/praefect.proto b/proto/praefect.proto
index 468642c53..ec5407fe0 100644
--- a/proto/praefect.proto
+++ b/proto/praefect.proto
@@ -57,19 +57,25 @@ message DatalossCheckRequest {
}
message DatalossCheckResponse {
- message Nodes {
- // relative path of the repository with outdated nodes
+ message Repository {
+ message Storage {
+ // name of the storage
+ string name = 1;
+ // behind_by indicates how many generations this storage is behind.
+ int64 behind_by = 2;
+ }
+
+ // relative path of the repository with outdated replicas
string relative_path = 1;
- // nodes whose copy of the repository is not up to date
- repeated string nodes = 2;
+ // storages on which the repository is outdated
+ repeated Storage storages = 2;
}
- string virtual_storage = 1;
- string previous_writable_primary = 2;
- string current_primary = 3;
- // whether the virtual storage is currently in read-only mode
- bool is_read_only = 4;
- repeated Nodes outdated_nodes = 5;
+ // current primary storage
+ string primary = 1;
+
+ // repositories with data loss
+ repeated Repository repositories = 2;
}
message RepositoryReplicasRequest{
diff --git a/ruby/proto/gitaly/praefect_pb.rb b/ruby/proto/gitaly/praefect_pb.rb
index 77412571c..559fb4677 100644
--- a/ruby/proto/gitaly/praefect_pb.rb
+++ b/ruby/proto/gitaly/praefect_pb.rb
@@ -16,15 +16,16 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
optional :virtual_storage, :string, 1
end
add_message "gitaly.DatalossCheckResponse" do
- optional :virtual_storage, :string, 1
- optional :previous_writable_primary, :string, 2
- optional :current_primary, :string, 3
- optional :is_read_only, :bool, 4
- repeated :outdated_nodes, :message, 5, "gitaly.DatalossCheckResponse.Nodes"
+ optional :primary, :string, 1
+ repeated :repositories, :message, 2, "gitaly.DatalossCheckResponse.Repository"
end
- add_message "gitaly.DatalossCheckResponse.Nodes" do
+ add_message "gitaly.DatalossCheckResponse.Repository" do
optional :relative_path, :string, 1
- repeated :nodes, :string, 2
+ repeated :storages, :message, 2, "gitaly.DatalossCheckResponse.Repository.Storage"
+ end
+ add_message "gitaly.DatalossCheckResponse.Repository.Storage" do
+ optional :name, :string, 1
+ optional :behind_by, :int64, 2
end
add_message "gitaly.RepositoryReplicasRequest" do
optional :repository, :message, 1, "gitaly.Repository"
@@ -57,7 +58,8 @@ module Gitaly
EnableWritesResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.EnableWritesResponse").msgclass
DatalossCheckRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckRequest").msgclass
DatalossCheckResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse").msgclass
- DatalossCheckResponse::Nodes = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Nodes").msgclass
+ DatalossCheckResponse::Repository = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Repository").msgclass
+ DatalossCheckResponse::Repository::Storage = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.DatalossCheckResponse.Repository.Storage").msgclass
RepositoryReplicasRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasRequest").msgclass
RepositoryReplicasResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse").msgclass
RepositoryReplicasResponse::RepositoryDetails = Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RepositoryReplicasResponse.RepositoryDetails").msgclass