Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-05-20 23:23:12 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-05-20 23:23:12 +0300
commit100d1926b1daeb12459ffd692f833c839cc19da9 (patch)
tree6e388c73273605f995e5ea010bf9e91cbdfce4fd
parent57f4779249b0be37659b9bd67f410a6b478ef3b7 (diff)
parent31dd06b15400b984dd35d623197271d5d2e61775 (diff)
Merge branch 'smh-reconcile-read-only' into 'master'
Run replication jobs against read-only primaries Closes #2772 See merge request gitlab-org/gitaly!2190
-rw-r--r--changelogs/unreleased/smh-reconcile-read-only.yml5
-rw-r--r--internal/praefect/coordinator_test.go3
-rw-r--r--internal/praefect/nodes/manager.go14
-rw-r--r--internal/praefect/replicator.go21
-rw-r--r--internal/praefect/replicator_test.go82
5 files changed, 120 insertions, 5 deletions
diff --git a/changelogs/unreleased/smh-reconcile-read-only.yml b/changelogs/unreleased/smh-reconcile-read-only.yml
new file mode 100644
index 000000000..7f61a43c3
--- /dev/null
+++ b/changelogs/unreleased/smh-reconcile-read-only.yml
@@ -0,0 +1,5 @@
+---
+title: Run replication jobs against read-only primaries
+merge_request: 2190
+author:
+type: changed
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 00863e84c..b100b2ba4 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -56,11 +56,12 @@ func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes.
type mockNode struct {
nodes.Node
storageName string
+ conn *grpc.ClientConn
}
func (m *mockNode) GetStorage() string { return m.storageName }
-func (m *mockNode) GetConnection() *grpc.ClientConn { return nil }
+func (m *mockNode) GetConnection() *grpc.ClientConn { return m.conn }
func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
for _, tc := range []struct {
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index ba31dd24c..d9d79ec2f 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -34,6 +34,20 @@ type Shard struct {
Secondaries []Node
}
+func (s Shard) GetNode(storage string) (Node, error) {
+ if storage == s.Primary.GetStorage() {
+ return s.Primary, nil
+ }
+
+ for _, node := range s.Secondaries {
+ if storage == node.GetStorage() {
+ return node, nil
+ }
+ }
+
+ return nil, fmt.Errorf("node with storage %q does not exist", storage)
+}
+
// Manager is responsible for returning shards for virtual storages
type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 0eab57b29..fe6a438ff 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -422,10 +422,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
if err == nil {
- for _, secondary := range shard.Secondaries {
- events, err := r.datastore.Dequeue(ctx, virtualStorage, secondary.GetStorage(), 10)
+ targetNodes := shard.Secondaries
+ if shard.IsReadOnly {
+ targetNodes = append(targetNodes, shard.Primary)
+ }
+
+ for _, target := range targetNodes {
+ events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
if err != nil {
- logger.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
continue
}
@@ -439,7 +444,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
continue
}
- if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil {
+
+ source, err := shard.GetNode(job.SourceNode.Storage)
+ if err != nil {
+ logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job")
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
+ continue
+ }
+
+ if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
logWithReplJobID: job.ID,
logWithReplVirtual: job.VirtualStorage,
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 6aefbf14c..378d98628 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -676,6 +676,88 @@ func TestProcessBacklog_Success(t *testing.T) {
require.True(t, helper.IsGitDirectory(fullNewPath2), "repository must exist at new last RenameRepository location")
}
+type mockReplicator struct {
+ Replicator
+ ReplicateFunc func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
+}
+
+func (m mockReplicator) Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
+ return m.ReplicateFunc(ctx, job, source, target)
+}
+
+func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ const virtualStorage = "virtal-storage"
+ const primaryStorage = "storage-1"
+ const secondaryStorage = "storage-2"
+
+ primaryConn := &grpc.ClientConn{}
+ secondaryConn := &grpc.ClientConn{}
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*models.Node{
+ {Storage: primaryStorage},
+ {Storage: secondaryStorage},
+ },
+ },
+ },
+ }
+
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
+ _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: "ignored",
+ TargetNodeStorage: primaryStorage,
+ SourceNodeStorage: secondaryStorage,
+ VirtualStorage: virtualStorage,
+ },
+ })
+ require.NoError(t, err)
+
+ replMgr := NewReplMgr(
+ testhelper.DiscardTestEntry(t),
+ datastore.Datastore{datastore.NewInMemory(conf), queue},
+ &mockNodeManager{
+ GetShardFunc: func(vs string) (nodes.Shard, error) {
+ require.Equal(t, virtualStorage, vs)
+ return nodes.Shard{
+ IsReadOnly: true,
+ Primary: &mockNode{
+ storageName: primaryStorage,
+ conn: primaryConn,
+ },
+ Secondaries: []nodes.Node{&mockNode{
+ storageName: secondaryStorage,
+ conn: secondaryConn,
+ }},
+ }, nil
+ },
+ },
+ )
+
+ processed := make(chan struct{})
+ replMgr.replicator = mockReplicator{
+ ReplicateFunc: func(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error {
+ require.True(t, primaryConn == target)
+ require.True(t, secondaryConn == source)
+ close(processed)
+ return nil
+ },
+ }
+ replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+ select {
+ case <-processed:
+ case <-time.After(5 * time.Second):
+ t.Fatalf("replication job targeting read-only primary was not processed before timeout")
+ }
+}
+
func TestBackoff(t *testing.T) {
start := 1 * time.Microsecond
max := 6 * time.Microsecond