diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-05-19 19:43:16 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-05-20 18:28:00 +0300 |
commit | 31dd06b15400b984dd35d623197271d5d2e61775 (patch) | |
tree | d598b4a10f4026e961c3264dde38a7800dc1677d | |
parent | 6d6b8db3b76f57d5a459cc64fd3ac583e6d96bf7 (diff) |
run replication jobs against read-only primary
Replication jobs are not currently ran against the primary node. This
makes data recovery efforts harder as it is not possible run replication
jobs from the previous primary towards the newly elected one.
This commit runs replication jobs against read-only primaries and thus
makes it possible to run `praefect reconcile` against a newly elected
read-only primary.
-rw-r--r-- | changelogs/unreleased/smh-reconcile-read-only.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 14 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 21 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 82 |
5 files changed, 120 insertions, 5 deletions
diff --git a/changelogs/unreleased/smh-reconcile-read-only.yml b/changelogs/unreleased/smh-reconcile-read-only.yml new file mode 100644 index 000000000..7f61a43c3 --- /dev/null +++ b/changelogs/unreleased/smh-reconcile-read-only.yml @@ -0,0 +1,5 @@ +--- +title: Run replication jobs against read-only primaries +merge_request: 2190 +author: +type: changed diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 399a8d9ed..845625624 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -55,11 +55,12 @@ func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes. type mockNode struct { nodes.Node storageName string + conn *grpc.ClientConn } func (m *mockNode) GetStorage() string { return m.storageName } -func (m *mockNode) GetConnection() *grpc.ClientConn { return nil } +func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn } func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { for _, tc := range []struct { diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index f7e94f28f..5c5971b64 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -33,6 +33,20 @@ type Shard struct { Secondaries []Node } +func (s Shard) GetNode(storage string) (Node, error) { + if storage == s.Primary.GetStorage() { + return s.Primary, nil + } + + for _, node := range s.Secondaries { + if storage == node.GetStorage() { + return node, nil + } + } + + return nil, fmt.Errorf("node with storage %q does not exist", storage) +} + // Manager is responsible for returning shards for virtual storages type Manager interface { GetShard(virtualStorageName string) (Shard, error) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 0eab57b29..fe6a438ff 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -422,10 +422,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora var totalEvents int shard, err := r.nodeManager.GetShard(virtualStorage) if err == nil { - for _, secondary := range shard.Secondaries { - events, err := r.datastore.Dequeue(ctx, virtualStorage, secondary.GetStorage(), 10) + targetNodes := shard.Secondaries + if shard.IsReadOnly { + targetNodes = append(targetNodes, shard.Primary) + } + + for _, target := range targetNodes { + events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) if err != nil { - logger.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events") + logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") continue } @@ -439,7 +444,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) continue } - if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil { + + source, err := shard.GetNode(job.SourceNode.Storage) + if err != nil { + logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job") + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) + continue + } + + if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil { logger.WithFields(logrus.Fields{ logWithReplJobID: job.ID, logWithReplVirtual: job.VirtualStorage, diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 6aefbf14c..378d98628 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -676,6 +676,88 @@ func TestProcessBacklog_Success(t *testing.T) { require.True(t, helper.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location") } +type mockReplicator struct { + Replicator + ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error +} + +func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { + return m.ReplicateFunc(ctx, job, source, target) +} + +func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + const virtualStorage = "virtal-storage" + const primaryStorage = "storage-1" + const secondaryStorage = "storage-2" + + primaryConn := &grpc.ClientConn{} + secondaryConn := &grpc.ClientConn{} + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*models.Node{ + {Storage: primaryStorage}, + {Storage: secondaryStorage}, + }, + }, + }, + } + + queue := datastore.NewMemoryReplicationEventQueue(conf) + _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: "ignored", + TargetNodeStorage: primaryStorage, + SourceNodeStorage: secondaryStorage, + VirtualStorage: virtualStorage, + }, + }) + require.NoError(t, err) + + replMgr := NewReplMgr( + testhelper.DiscardTestEntry(t), + datastore.Datastore{datastore.NewInMemory(conf), queue}, + &mockNodeManager{ + GetShardFunc: func(vs string) (nodes.Shard, error) { + require.Equal(t, virtualStorage, vs) + return nodes.Shard{ + IsReadOnly: true, + Primary: &mockNode{ + storageName: primaryStorage, + conn: primaryConn, + }, + Secondaries: []nodes.Node{&mockNode{ + storageName: secondaryStorage, + conn: secondaryConn, + }}, + }, nil + }, + }, + ) + + processed := make(chan struct{}) + replMgr.replicator = mockReplicator{ + ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { + require.True(t, primaryConn == target) + require.True(t, secondaryConn == source) + close(processed) + return nil + }, + } + replMgr.ProcessBacklog(ctx, noopBackoffFunc) + select { + case <-processed: + case <-time.After(5 * time.Second): + t.Fatalf("replication job targeting read-only primary was not processed before timeout") + } +} + func TestBackoff(t *testing.T) { start := 1 * time.Microsecond max := 6 * time.Microsecond |