Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStan Hu <stanhu@gmail.com>2020-03-07 02:42:43 +0300
committerStan Hu <stanhu@gmail.com>2020-03-07 02:42:43 +0300
commit95fca8eee78725ae1af7572a96934a6c1c975508 (patch)
tree13b4e1d280a29e2abc294b810b65efea5f24e119
parent5033efb584146cd6d5b26bdc671ffa662a67a788 (diff)
Ping hosts in parallelsh-add-sql-leader-elections
Parameterize failover duration
-rw-r--r--internal/praefect/nodes/manager.go30
1 files changed, 22 insertions, 8 deletions
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 07b077ff0..92403b743 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -21,6 +21,8 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
+var failoverThresholdSeconds = 20
+
// Shard is a primary with a set of secondaries
type Shard interface {
GetPrimary() (Node, error)
@@ -187,19 +189,19 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error {
VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name)
DO UPDATE SET
node_name =
- CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN
+ CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN
excluded.node_name
ELSE
shard_elections.node_name
END,
last_seen_active =
- CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN
+ CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN
now()
ELSE
shard_elections.last_seen_active
END`
- _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName))
+ _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds))
if err != nil {
n.log.Errorf("Error updating leader: %s", err)
@@ -209,9 +211,9 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error {
func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) {
q := fmt.Sprintf(`SELECT node_name FROM shard_elections
- WHERE last_seen_active > now() - interval '20 seconds'
+ WHERE last_seen_active > now() - interval '%d seconds'
AND is_primary IS TRUE
- AND shard_name = '%s'`, shardName)
+ AND shard_name = '%s'`, failoverThresholdSeconds, shardName)
rows, err := n.db.Query(q)
if err != nil {
@@ -239,13 +241,25 @@ func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) {
}
func (n *Mgr) checkShards() {
+ var wg sync.WaitGroup
+
for shardName, shard := range n.shards {
for _, node := range shard.allNodes {
- if node.check() {
- n.updateLeader(shardName, node.GetStorage())
- }
+ n.log.Info("checking node " + node.GetStorage() + ": " + node.GetAddress())
+
+ wg.Add(1)
+ go func(node *nodeStatus) {
+ defer wg.Done()
+
+ if node.check() {
+ n.updateLeader(shardName, node.GetStorage())
+ } else {
+ n.log.Info("No response from " + node.GetStorage())
+ }
+ }(node)
}
+ wg.Wait()
primary, err := n.lookupPrimary(shardName)
if err == nil && primary != shard.primary {