diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-05-20 23:23:12 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-05-20 23:23:12 +0300 |
commit | 100d1926b1daeb12459ffd692f833c839cc19da9 (patch) | |
tree | 6e388c73273605f995e5ea010bf9e91cbdfce4fd | |
parent | 57f4779249b0be37659b9bd67f410a6b478ef3b7 (diff) | |
parent | 31dd06b15400b984dd35d623197271d5d2e61775 (diff) |
Merge branch 'smh-reconcile-read-only' into 'master'
Run replication jobs against read-only primaries
Closes #2772
See merge request gitlab-org/gitaly!2190
-rw-r--r-- | changelogs/unreleased/smh-reconcile-read-only.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 14 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 21 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 82 |
5 files changed, 120 insertions, 5 deletions
diff --git a/changelogs/unreleased/smh-reconcile-read-only.yml b/changelogs/unreleased/smh-reconcile-read-only.yml new file mode 100644 index 000000000..7f61a43c3 --- /dev/null +++ b/changelogs/unreleased/smh-reconcile-read-only.yml @@ -0,0 +1,5 @@ +--- +title: Run replication jobs against read-only primaries +merge_request: 2190 +author: +type: changed diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 00863e84c..b100b2ba4 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -56,11 +56,12 @@ func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes. type mockNode struct { nodes.Node storageName string + conn *grpc.ClientConn } func (m *mockNode) GetStorage() string { return m.storageName } -func (m *mockNode) GetConnection() *grpc.ClientConn { return nil } +func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn } func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { for _, tc := range []struct { diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index ba31dd24c..d9d79ec2f 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -34,6 +34,20 @@ type Shard struct { Secondaries []Node } +func (s Shard) GetNode(storage string) (Node, error) { + if storage == s.Primary.GetStorage() { + return s.Primary, nil + } + + for _, node := range s.Secondaries { + if storage == node.GetStorage() { + return node, nil + } + } + + return nil, fmt.Errorf("node with storage %q does not exist", storage) +} + // Manager is responsible for returning shards for virtual storages type Manager interface { GetShard(virtualStorageName string) (Shard, error) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 0eab57b29..fe6a438ff 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -422,10 +422,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora var totalEvents int shard, err := r.nodeManager.GetShard(virtualStorage) if err == nil { - for _, secondary := range shard.Secondaries { - events, err := r.datastore.Dequeue(ctx, virtualStorage, secondary.GetStorage(), 10) + targetNodes := shard.Secondaries + if shard.IsReadOnly { + targetNodes = append(targetNodes, shard.Primary) + } + + for _, target := range targetNodes { + events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10) if err != nil { - logger.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events") + logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") continue } @@ -439,7 +444,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) continue } - if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil { + + source, err := shard.GetNode(job.SourceNode.Storage) + if err != nil { + logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job") + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) + continue + } + + if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil { logger.WithFields(logrus.Fields{ logWithReplJobID: job.ID, logWithReplVirtual: job.VirtualStorage, diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 6aefbf14c..378d98628 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -676,6 +676,88 @@ func TestProcessBacklog_Success(t *testing.T) { require.True(t, helper.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location") } +type mockReplicator struct { + Replicator + ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error +} + +func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { + return m.ReplicateFunc(ctx, job, source, target) +} + +func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + const virtualStorage = "virtal-storage" + const primaryStorage = "storage-1" + const secondaryStorage = "storage-2" + + primaryConn := &grpc.ClientConn{} + secondaryConn := &grpc.ClientConn{} + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*models.Node{ + {Storage: primaryStorage}, + {Storage: secondaryStorage}, + }, + }, + }, + } + + queue := datastore.NewMemoryReplicationEventQueue(conf) + _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: "ignored", + TargetNodeStorage: primaryStorage, + SourceNodeStorage: secondaryStorage, + VirtualStorage: virtualStorage, + }, + }) + require.NoError(t, err) + + replMgr := NewReplMgr( + testhelper.DiscardTestEntry(t), + datastore.Datastore{datastore.NewInMemory(conf), queue}, + &mockNodeManager{ + GetShardFunc: func(vs string) (nodes.Shard, error) { + require.Equal(t, virtualStorage, vs) + return nodes.Shard{ + IsReadOnly: true, + Primary: &mockNode{ + storageName: primaryStorage, + conn: primaryConn, + }, + Secondaries: []nodes.Node{&mockNode{ + storageName: secondaryStorage, + conn: secondaryConn, + }}, + }, nil + }, + }, + ) + + processed := make(chan struct{}) + replMgr.replicator = mockReplicator{ + ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error { + require.True(t, primaryConn == target) + require.True(t, secondaryConn == source) + close(processed) + return nil + }, + } + replMgr.ProcessBacklog(ctx, noopBackoffFunc) + select { + case <-processed: + case <-time.After(5 * time.Second): + t.Fatalf("replication job targeting read-only primary was not processed before timeout") + } +} + func TestBackoff(t *testing.T) { start := 1 * time.Microsecond max := 6 * time.Microsecond |