Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-05-19 19:43:16 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-05-20 18:28:00 +0300
commit31dd06b15400b984dd35d623197271d5d2e61775 (patch)
treed598b4a10f4026e961c3264dde38a7800dc1677d /internal/praefect/replicator.go
parent6d6b8db3b76f57d5a459cc64fd3ac583e6d96bf7 (diff)
run replication jobs against read-only primary
Replication jobs are not currently ran against the primary node. This makes data recovery efforts harder as it is not possible run replication jobs from the previous primary towards the newly elected one. This commit runs replication jobs against read-only primaries and thus makes it possible to run `praefect reconcile` against a newly elected read-only primary.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r--internal/praefect/replicator.go21
1 files changed, 17 insertions, 4 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 0eab57b29..fe6a438ff 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -422,10 +422,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
if err == nil {
- for _, secondary := range shard.Secondaries {
- events, err := r.datastore.Dequeue(ctx, virtualStorage, secondary.GetStorage(), 10)
+ targetNodes := shard.Secondaries
+ if shard.IsReadOnly {
+ targetNodes = append(targetNodes, shard.Primary)
+ }
+
+ for _, target := range targetNodes {
+ events, err := r.datastore.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
if err != nil {
- logger.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
continue
}
@@ -439,7 +444,15 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
continue
}
- if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil {
+
+ source, err := shard.GetNode(job.SourceNode.Storage)
+ if err != nil {
+ logger.WithField("event", event).WithError(err).Error("failed to get source node for replication job")
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
+ continue
+ }
+
+ if err := r.processReplJob(ctx, job, source.GetConnection(), target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
logWithReplJobID: job.ID,
logWithReplVirtual: job.VirtualStorage,