diff options
author | Stan Hu <stanhu@gmail.com> | 2020-03-07 02:42:43 +0300 |
---|---|---|
committer | Stan Hu <stanhu@gmail.com> | 2020-03-07 02:42:43 +0300 |
commit | 95fca8eee78725ae1af7572a96934a6c1c975508 (patch) | |
tree | 13b4e1d280a29e2abc294b810b65efea5f24e119 | |
parent | 5033efb584146cd6d5b26bdc671ffa662a67a788 (diff) |
Ping hosts in parallelsh-add-sql-leader-elections
Parameterize failover duration
-rw-r--r-- | internal/praefect/nodes/manager.go | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 07b077ff0..92403b743 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -21,6 +21,8 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) +var failoverThresholdSeconds = 20 + // Shard is a primary with a set of secondaries type Shard interface { GetPrimary() (Node, error) @@ -187,19 +189,19 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error { VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) DO UPDATE SET node_name = - CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN excluded.node_name ELSE shard_elections.node_name END, last_seen_active = - CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN now() ELSE shard_elections.last_seen_active END` - _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName)) + _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds)) if err != nil { n.log.Errorf("Error updating leader: %s", err) @@ -209,9 +211,9 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error { func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { q := fmt.Sprintf(`SELECT node_name FROM shard_elections - WHERE last_seen_active > now() - interval '20 seconds' + WHERE last_seen_active > now() - interval '%d seconds' AND is_primary IS TRUE - AND shard_name = '%s'`, shardName) + AND shard_name = '%s'`, failoverThresholdSeconds, shardName) rows, err := n.db.Query(q) if err != nil { @@ -239,13 +241,25 @@ func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { } func (n *Mgr) checkShards() { + var wg sync.WaitGroup + for shardName, shard := range n.shards { for _, node := range shard.allNodes { - if node.check() { - n.updateLeader(shardName, node.GetStorage()) - } + n.log.Info("checking node " + node.GetStorage() + ": " + node.GetAddress()) + + wg.Add(1) + go func(node *nodeStatus) { + defer wg.Done() + + if node.check() { + n.updateLeader(shardName, node.GetStorage()) + } else { + n.log.Info("No response from " + node.GetStorage()) + } + }(node) } + wg.Wait() primary, err := n.lookupPrimary(shardName) if err == nil && primary != shard.primary { |