Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-05-19 19:43:16 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-05-20 18:28:00 +0300
commit31dd06b15400b984dd35d623197271d5d2e61775 (patch)
treed598b4a10f4026e961c3264dde38a7800dc1677d
parent6d6b8db3b76f57d5a459cc64fd3ac583e6d96bf7 (diff)
run replication jobs against read-only primary
Replication jobs are not currently ran against the primary node. This makes data recovery efforts harder as it is not possible run replication jobs from the previous primary towards the newly elected one. This commit runs replication jobs against read-only primaries and thus makes it possible to run `praefect reconcile` against a newly elected read-only primary.
-rw-r--r--changelogs/unreleased/smh-reconcile-read-only.yml5
-rw-r--r--internal/praefect/coordinator_test.go3
-rw-r--r--internal/praefect/nodes/manager.go14
-rw-r--r--internal/praefect/replicator.go21
-rw-r--r--internal/praefect/replicator_test.go82
5 files changed, 120 insertions, 5 deletions
diff --git a/changelogs/unreleased/smh-reconcile-read-only.yml b/changelogs/unreleased/smh-reconcile-read-only.yml
new file mode 100644
index 000000000..7f61a43c3
--- /dev/null
+++ b/changelogs/unreleased/smh-reconcile-read-only.yml
@@ -0,0 +1,5 @@
+---
+title: Run replication jobs against read-only primaries
+merge_request: 2190
+author:
+type: changed
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 399a8d9ed..845625624 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -55,11 +55,12 @@ func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes.
type mockNode struct {
nodes.Node
storageName string
+ conn *grpc.ClientConn
}
func (m *mockNode) GetStorage() string { return m.storageName }
-func (m *mockNode) GetConnection() *grpc.ClientConn { return nil }
+func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn }
func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
for _, tc := range []struct {
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index f7e94f28f..5c5971b64 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -33,6 +33,20 @@ type Shard struct {
Secondaries []Node
}
+func (s Shard) GetNode(storage string) (Node, error) {
+ if storage == s.Primary.GetStorage() {
+ return s.Primary, nil
+ }
+
+ for _, node := range s.Secondaries {
+ if storage == node.GetStorage() {
+ return node, nil
+ }
+ }
+
+ return nil, fmt.Errorf("node with storage %q does not exist", storage)
+}
+
// Manager is responsible for returning shards for virtual storages
type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 0eab57b29..fe6a438ff 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -422,10 +422,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
if err == nil {
- for _, secondary := range shard.Secondaries {
- events, err := r.datastore.Dequeue(ctx, virtualStorage, secondary.GetStorage(), 10)
+ targetNodes := shard.Secondaries
+ if shard.IsReadOnly {
+ targetNodes = append(targetNodes, shard.Primary)
+ }
+
+ for _, target := range targetNodes {
+ events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
if err != nil {
- logger.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
continue
}
@@ -439,7 +444,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
continue
}
- if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil {
+
+ source, err := shard.GetNode(job.SourceNode.Storage)
+ if err != nil {
+ logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job")
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
+ continue
+ }
+
+ if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
logWithReplJobID: job.ID,
logWithReplVirtual: job.VirtualStorage,
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 6aefbf14c..378d98628 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -676,6 +676,88 @@ func TestProcessBacklog_Success(t *testing.T) {
require.True(t, helper.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location")
}
+type mockReplicator struct {
+ Replicator
+ ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
+}
+
+func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
+ return m.ReplicateFunc(ctx, job, source, target)
+}
+
+func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const virtualStorage = "virtal-storage"
+ const primaryStorage = "storage-1"
+ const secondaryStorage = "storage-2"
+
+ primaryConn := &grpc.ClientConn{}
+ secondaryConn := &grpc.ClientConn{}
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*models.Node{
+ {Storage: primaryStorage},
+ {Storage: secondaryStorage},
+ },
+ },
+ },
+ }
+
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
+ _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: "ignored",
+ TargetNodeStorage: primaryStorage,
+ SourceNodeStorage: secondaryStorage,
+ VirtualStorage: virtualStorage,
+ },
+ })
+ require.NoError(t, err)
+
+ replMgr := NewReplMgr(
+ testhelper.DiscardTestEntry(t),
+ datastore.Datastore{datastore.NewInMemory(conf), queue},
+ &mockNodeManager{
+ GetShardFunc: func(vs string) (nodes.Shard, error) {
+ require.Equal(t, virtualStorage, vs)
+ return nodes.Shard{
+ IsReadOnly: true,
+ Primary: &mockNode{
+ storageName: primaryStorage,
+ conn: primaryConn,
+ },
+ Secondaries: []nodes.Node{&mockNode{
+ storageName: secondaryStorage,
+ conn: secondaryConn,
+ }},
+ }, nil
+ },
+ },
+ )
+
+ processed := make(chan struct{})
+ replMgr.replicator = mockReplicator{
+ ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
+ require.True(t, primaryConn == target)
+ require.True(t, secondaryConn == source)
+ close(processed)
+ return nil
+ },
+ }
+ replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+ select {
+ case <-processed:
+ case <-time.After(5 * time.Second):
+ t.Fatalf("replication job targeting read-only primary was not processed before timeout")
+ }
+}
+
func TestBackoff(t *testing.T) {
start := 1 * time.Microsecond
max := 6 * time.Microsecond