diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-23 17:48:18 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-04-28 12:04:15 +0300 |
commit | 36bfda9867966a986030d57b953a24d71f57c2a6 (patch) | |
tree | 49384a3ca48401fb3b867037ad1c0fd951cddef0 /internal/praefect/replicator.go | |
parent | ac269372135e5a6830b5ab5682eb371e6623f588 (diff) |
provide consistent view of primary and secondaries
There is a possible race condition when getting primaries and
secondaries. Currently, the primary is cached in memory when getting
the primary. The cached primary is then used to determine which nodes
are the secondaries. If the primary fails before the call to
GetSecondaries is made, we'll get inconsistent secondaries as the
previous primary that failed over would now be considered a secondary
along with the newly elected primary. This commit fixes the problem by
returning a complete status of the shard each time and thus avoiding
inconsistencies.
Diffstat (limited to 'internal/praefect/replicator.go')
-rw-r--r-- | internal/praefect/replicator.go | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index aad04b680..656672f35 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -363,25 +363,6 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc { } } -func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) { - shard, err := r.nodeManager.GetShard(r.virtualStorage) - if err != nil { - return nil, nil, err - } - - primary, err = shard.GetPrimary() - if err != nil { - return nil, nil, err - } - - secondaries, err = shard.GetSecondaries() - if err != nil { - return nil, nil, err - } - - return primary, secondaries, nil -} - // createReplJob converts `ReplicationEvent` into `ReplJob`. // It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`. func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) { @@ -421,9 +402,9 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { for { var totalEvents int - primary, secondaries, err := r.getPrimaryAndSecondaries() + shard, err := r.nodeManager.GetShard(r.virtualStorage) if err == nil { - for _, secondary := range secondaries { + for _, secondary := range shard.Secondaries { events, err := r.datastore.Dequeue(ctx, secondary.GetStorage(), 10) if err != nil { r.log.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events") @@ -440,7 +421,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) continue } - if err := r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { + if err := r.processReplJob(ctx, job, shard.Primary.GetConnection(), secondary.GetConnection()); err != nil { r.log.WithFields(logrus.Fields{ logWithReplJobID: job.ID, logWithReplTarget: job.TargetNode.Storage, |