Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2020-02-27 20:35:05 +0300
committerJacob Vosmaer <jacob@gitlab.com>2020-02-27 20:35:05 +0300
commit245a818ce8a23fdab3448aa0c412a147eb35f415 (patch)
treec70d99e36e691f1ffaf8f2817a188b8eb7b4724b
parent500d46e6eeec27a95ac499222c692eb27dcdbfaf (diff)
Use consul sessions and health checksjv-jc-consul-for-leader-election
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/nodes/manager.go185
-rw-r--r--internal/server/auth/auth.go6
3 files changed, 94 insertions, 99 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index ab9e0c4ab..6e13fc348 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -143,7 +143,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
if conf.FailoverEnabled {
go nodeManager.Start()
}
- // nodeManager.Start(1*time.Second, 3*time.Second)
+ nodeManager.Start()
latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus)
if err != nil {
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 2df9e901c..7b115254f 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -1,7 +1,6 @@
package nodes
import (
- "context"
"encoding/json"
"errors"
"fmt"
@@ -19,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// Shard is a primary with a set of secondaries
@@ -168,6 +166,24 @@ func (n *Mgr) registerNodes() error {
return g.Wait()
}
+func (n *Mgr) getPrimary(virtualStorageName string) (*Value, error) {
+ kv, _, err := n.consulClient.KV().Get(getKey(virtualStorageName), nil)
+ if err != nil {
+ return nil, err
+ }
+
+ var value Value
+
+ if kv == nil || kv.Value == nil {
+ return nil, errors.New("no primary has been elected")
+ }
+
+ if err := json.Unmarshal(kv.Value, &value); err != nil {
+ return nil, err
+ }
+ return &value, nil
+}
+
// GetShard retrieves a shard for a virtual storage name
func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
shard, ok := n.shards[virtualStorageName]
@@ -176,24 +192,12 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
}
if !n.failoverEnabled {
+ panic("no failover")
return shard, nil
}
- key := getKey(virtualStorageName)
-
- kv, _, err := n.consulClient.KV().Get(key, nil)
+ leader, err := n.getPrimary(virtualStorageName)
if err != nil {
- logrus.WithError(err).Error("error when getting leader")
- return nil, err
- }
-
- var leader Value
-
- if kv == nil || kv.Value == nil {
- return nil, errors.New("no primary has been elected")
- }
-
- if err := json.Unmarshal(kv.Value, &leader); err != nil {
return nil, err
}
@@ -262,12 +266,13 @@ func (n *Mgr) registerConsulAndPoll(virtualStorageName, internalStorageName, add
if err != nil {
return err
}
- isLeader, sessionID, err := n.registerConsul(key, value, address)
+
+ checkID, err := n.registerConsul(internalStorageName, key, value, address)
if err != nil {
return err
}
- go n.keepTryingToBePrimary(cc, key, sessionID, isLeader, value)
+ go n.keepTryingToBePrimary(virtualStorageName, internalStorageName, key, value, checkID)
return nil
}
@@ -286,112 +291,96 @@ func getKey(virtualStorageName string) string {
return fmt.Sprintf("service/%s/primary", virtualStorageName)
}
-func (n *Mgr) registerConsul(key string, value []byte, address string) (bool, string, error) {
+func (n *Mgr) registerConsul(nodeName string, key string, value []byte, address string) (string, error) {
addressWithoutScheme := strings.TrimPrefix(address, "tcp://")
- if err := n.consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{
+ checkID := fmt.Sprintf("gitaly-internal-%s", addressWithoutScheme)
+
+ agent := n.consulClient.Agent()
+ if err := agent.ServiceRegister(&api.AgentServiceRegistration{
Address: addressWithoutScheme,
ID: addressWithoutScheme,
- Name: "monitoring",
+ Name: nodeName,
Tags: []string{"monitoring"},
Check: &api.AgentServiceCheck{
- Name: "Service health status",
+ CheckID: checkID,
+ Name: "gitaly internal grpc health",
GRPC: addressWithoutScheme,
- Interval: "10s",
+ Interval: "1s",
},
}); err != nil {
- return false, "", err
+ return "", err
}
+ return checkID, nil
+}
+
+type Value struct {
+ Storage string `json:"storage"`
+}
+
+func (n *Mgr) checkHealth(serviceName string) error {
+ checks, _, err := n.consulClient.Health().Checks(serviceName, nil)
+ if err != nil {
+ return err
+ }
+ if status := checks.AggregatedStatus(); status != "passing" {
+ return fmt.Errorf("service %s unhealthy: %s", serviceName, status)
+ }
+ return nil
+}
+
+func (n *Mgr) becomePrimary(serviceName string, checkID string, key string, b []byte) {
sessionID, _, err := n.consulClient.Session().Create(&api.SessionEntry{
- Name: key, // distributed lock
- Behavior: "delete",
- TTL: "10s",
+ // Sessions are scoped to a node. This session is for an internal Gitaly
+ // service. The session should be scoped to the node that internal
+ // service is running on.
+ // Node: internalNodeName,
+ Behavior: "delete",
+ ServiceChecks: []api.ServiceCheck{api.ServiceCheck{ID: checkID}},
}, nil)
+ if err != nil {
+ n.log.WithField("nodeName", serviceName).WithError(err).Error("couldn't get session")
+ return
+ }
+
+ isLeader := false
+ defer func() {
+ if !isLeader {
+ if _, err := n.consulClient.Session().Destroy(sessionID, nil); err != nil {
+ n.log.WithField("nodeName", serviceName).WithError(err).Error("couldn't destroy session")
+ }
+ }
+ }()
- isLeader, _, err := n.consulClient.KV().Acquire(&api.KVPair{
+ isLeader, _, err = n.consulClient.KV().Acquire(&api.KVPair{
Key: key, // distributed lock
- Value: value,
+ Value: b,
Session: sessionID,
}, nil)
-
if err != nil {
- return false, "", err
+ n.log.WithField("nodeName", serviceName).WithField("session", sessionID).WithError(err).Error("couldn't update leader key")
+ return
}
if isLeader {
- n.log.WithField("internal_storage", address).Info("I am the leader!! 👑")
+ n.log.WithField("nodeName", serviceName).Info("I'm the new leader 😈!")
+ } else {
+ n.log.WithField("nodeName", serviceName).Info("failed to become leader")
}
-
- return isLeader, sessionID, nil
}
-type Value struct {
- Storage string `json:"storage"`
-}
-
-func (n *Mgr) keepTryingToBePrimary(cc *grpc.ClientConn, key, sessionID string, isLeader bool, b []byte) {
- doneChan := make(chan struct{})
-
- if isLeader {
- go n.consulClient.Session().RenewPeriodic(
- "10s",
- sessionID,
- nil,
- doneChan,
- )
- }
-
- for {
- <-time.Tick(5 * time.Second)
- var err error
-
- client := healthpb.NewHealthClient(cc)
- ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
- defer cancel()
-
- resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
- if err != nil {
- n.log.WithError(err).WithField("storage", string(b)).Error("error when pinging healthcheck")
- }
- if err != nil || resp.Status != healthpb.HealthCheckResponse_SERVING {
- if isLeader {
- n.log.Error("CLOSING CHANNEL")
- close(doneChan)
- isLeader = false
- }
+func (n *Mgr) keepTryingToBePrimary(virtualStorageName, serviceName string, key string, b []byte, checkID string) {
+ for ; ; time.Sleep(1 * time.Second) {
+ if _, err := n.getPrimary(virtualStorageName); err == nil {
+ // no failover required
continue
}
- if !isLeader {
- n.log.WithField("value", string(b)).Info("I'm not the primary but I really want to be")
-
- sessionID, _, err := n.consulClient.Session().Create(&api.SessionEntry{
- Name: key, // distributed lock
- Behavior: "delete",
- TTL: "10s",
- }, nil)
- if err != nil {
- n.log.WithField("value", string(b)).WithError(err).Error("couldn't get session")
- continue
- }
-
- isLeader, _, err = n.consulClient.KV().Acquire(&api.KVPair{
- Key: key, // distributed lock
- Value: b,
- Session: sessionID,
- }, nil)
-
- if isLeader {
- n.log.WithField("value", string(b)).Info("I'm the new leader 😈!")
- doneChan = make(chan struct{})
-
- go n.consulClient.Session().RenewPeriodic(
- "10s",
- sessionID,
- nil,
- doneChan,
- )
- }
+ if err := n.checkHealth(serviceName); err != nil {
+ n.log.WithField("nodeName", serviceName).WithError(err).Error("node is not healthy or health check failed")
+ continue
}
+
+ n.becomePrimary(serviceName, checkID, key, b)
}
}
diff --git a/internal/server/auth/auth.go b/internal/server/auth/auth.go
index 8e2eec143..9f28bd329 100644
--- a/internal/server/auth/auth.go
+++ b/internal/server/auth/auth.go
@@ -2,6 +2,7 @@ package auth
import (
"context"
+ "strings"
"time"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
@@ -44,6 +45,11 @@ func checkFunc(conf internalauth.Config) func(ctx context.Context) (context.Cont
return ctx, nil
}
+ if m, ok := grpc.Method(ctx); ok && strings.HasPrefix(m, "/grpc.health.v1.Health/") {
+ countStatus("health check (bypass auth)", conf.Transitioning).Inc()
+ return ctx, nil
+ }
+
err := gitalyauth.CheckToken(ctx, conf.Token, time.Now())
switch status.Code(err) {
case codes.OK: