diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2020-02-27 20:35:05 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2020-02-27 20:35:05 +0300 |
commit | 245a818ce8a23fdab3448aa0c412a147eb35f415 (patch) | |
tree | c70d99e36e691f1ffaf8f2817a188b8eb7b4724b | |
parent | 500d46e6eeec27a95ac499222c692eb27dcdbfaf (diff) |
Use consul sessions and health checksjv-jc-consul-for-leader-election
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 185 | ||||
-rw-r--r-- | internal/server/auth/auth.go | 6 |
3 files changed, 94 insertions, 99 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index ab9e0c4ab..6e13fc348 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -143,7 +143,7 @@ func run(cfgs []starter.Config, conf config.Config) error { if conf.FailoverEnabled { go nodeManager.Start() } - // nodeManager.Start(1*time.Second, 3*time.Second) + nodeManager.Start() latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) if err != nil { diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 2df9e901c..7b115254f 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -1,7 +1,6 @@ package nodes import ( - "context" "encoding/json" "errors" "fmt" @@ -19,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // Shard is a primary with a set of secondaries @@ -168,6 +166,24 @@ func (n *Mgr) registerNodes() error { return g.Wait() } +func (n *Mgr) getPrimary(virtualStorageName string) (*Value, error) { + kv, _, err := n.consulClient.KV().Get(getKey(virtualStorageName), nil) + if err != nil { + return nil, err + } + + var value Value + + if kv == nil || kv.Value == nil { + return nil, errors.New("no primary has been elected") + } + + if err := json.Unmarshal(kv.Value, &value); err != nil { + return nil, err + } + return &value, nil +} + // GetShard retrieves a shard for a virtual storage name func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { shard, ok := n.shards[virtualStorageName] @@ -176,24 +192,12 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { } if !n.failoverEnabled { + panic("no failover") return shard, nil } - key := getKey(virtualStorageName) - - kv, _, err := n.consulClient.KV().Get(key, nil) + leader, err := n.getPrimary(virtualStorageName) if err != nil { - logrus.WithError(err).Error("error when getting leader") - return nil, err - } - - var leader Value - - if kv == nil || kv.Value == nil { - return nil, errors.New("no primary has been elected") - } - - if err := json.Unmarshal(kv.Value, &leader); err != nil { return nil, err } @@ -262,12 +266,13 @@ func (n *Mgr) registerConsulAndPoll(virtualStorageName, internalStorageName, add if err != nil { return err } - isLeader, sessionID, err := n.registerConsul(key, value, address) + + checkID, err := n.registerConsul(internalStorageName, key, value, address) if err != nil { return err } - go n.keepTryingToBePrimary(cc, key, sessionID, isLeader, value) + go n.keepTryingToBePrimary(virtualStorageName, internalStorageName, key, value, checkID) return nil } @@ -286,112 +291,96 @@ func getKey(virtualStorageName string) string { return fmt.Sprintf("service/%s/primary", virtualStorageName) } -func (n *Mgr) registerConsul(key string, value []byte, address string) (bool, string, error) { +func (n *Mgr) registerConsul(nodeName string, key string, value []byte, address string) (string, error) { addressWithoutScheme := strings.TrimPrefix(address, "tcp://") - if err := n.consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{ + checkID := fmt.Sprintf("gitaly-internal-%s", addressWithoutScheme) + + agent := n.consulClient.Agent() + if err := agent.ServiceRegister(&api.AgentServiceRegistration{ Address: addressWithoutScheme, ID: addressWithoutScheme, - Name: "monitoring", + Name: nodeName, Tags: []string{"monitoring"}, Check: &api.AgentServiceCheck{ - Name: "Service health status", + CheckID: checkID, + Name: "gitaly internal grpc health", GRPC: addressWithoutScheme, - Interval: "10s", + Interval: "1s", }, }); err != nil { - return false, "", err + return "", err } + return checkID, nil +} + +type Value struct { + Storage string `json:"storage"` +} + +func (n *Mgr) checkHealth(serviceName string) error { + checks, _, err := n.consulClient.Health().Checks(serviceName, nil) + if err != nil { + return err + } + if status := checks.AggregatedStatus(); status != "passing" { + return fmt.Errorf("service %s unhealthy: %s", serviceName, status) + } + return nil +} + +func (n *Mgr) becomePrimary(serviceName string, checkID string, key string, b []byte) { sessionID, _, err := n.consulClient.Session().Create(&api.SessionEntry{ - Name: key, // distributed lock - Behavior: "delete", - TTL: "10s", + // Sessions are scoped to a node. This session is for an internal Gitaly + // service. The session should be scoped to the node that internal + // service is running on. + // Node: internalNodeName, + Behavior: "delete", + ServiceChecks: []api.ServiceCheck{api.ServiceCheck{ID: checkID}}, }, nil) + if err != nil { + n.log.WithField("nodeName", serviceName).WithError(err).Error("couldn't get session") + return + } + + isLeader := false + defer func() { + if !isLeader { + if _, err := n.consulClient.Session().Destroy(sessionID, nil); err != nil { + n.log.WithField("nodeName", serviceName).WithError(err).Error("couldn't destroy session") + } + } + }() - isLeader, _, err := n.consulClient.KV().Acquire(&api.KVPair{ + isLeader, _, err = n.consulClient.KV().Acquire(&api.KVPair{ Key: key, // distributed lock - Value: value, + Value: b, Session: sessionID, }, nil) - if err != nil { - return false, "", err + n.log.WithField("nodeName", serviceName).WithField("session", sessionID).WithError(err).Error("couldn't update leader key") + return } if isLeader { - n.log.WithField("internal_storage", address).Info("I am the leader!! 👑") + n.log.WithField("nodeName", serviceName).Info("I'm the new leader 😈!") + } else { + n.log.WithField("nodeName", serviceName).Info("failed to become leader") } - - return isLeader, sessionID, nil } -type Value struct { - Storage string `json:"storage"` -} - -func (n *Mgr) keepTryingToBePrimary(cc *grpc.ClientConn, key, sessionID string, isLeader bool, b []byte) { - doneChan := make(chan struct{}) - - if isLeader { - go n.consulClient.Session().RenewPeriodic( - "10s", - sessionID, - nil, - doneChan, - ) - } - - for { - <-time.Tick(5 * time.Second) - var err error - - client := healthpb.NewHealthClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - if err != nil { - n.log.WithError(err).WithField("storage", string(b)).Error("error when pinging healthcheck") - } - if err != nil || resp.Status != healthpb.HealthCheckResponse_SERVING { - if isLeader { - n.log.Error("CLOSING CHANNEL") - close(doneChan) - isLeader = false - } +func (n *Mgr) keepTryingToBePrimary(virtualStorageName, serviceName string, key string, b []byte, checkID string) { + for ; ; time.Sleep(1 * time.Second) { + if _, err := n.getPrimary(virtualStorageName); err == nil { + // no failover required continue } - if !isLeader { - n.log.WithField("value", string(b)).Info("I'm not the primary but I really want to be") - - sessionID, _, err := n.consulClient.Session().Create(&api.SessionEntry{ - Name: key, // distributed lock - Behavior: "delete", - TTL: "10s", - }, nil) - if err != nil { - n.log.WithField("value", string(b)).WithError(err).Error("couldn't get session") - continue - } - - isLeader, _, err = n.consulClient.KV().Acquire(&api.KVPair{ - Key: key, // distributed lock - Value: b, - Session: sessionID, - }, nil) - - if isLeader { - n.log.WithField("value", string(b)).Info("I'm the new leader 😈!") - doneChan = make(chan struct{}) - - go n.consulClient.Session().RenewPeriodic( - "10s", - sessionID, - nil, - doneChan, - ) - } + if err := n.checkHealth(serviceName); err != nil { + n.log.WithField("nodeName", serviceName).WithError(err).Error("node is not healthy or health check failed") + continue } + + n.becomePrimary(serviceName, checkID, key, b) } } diff --git a/internal/server/auth/auth.go b/internal/server/auth/auth.go index 8e2eec143..9f28bd329 100644 --- a/internal/server/auth/auth.go +++ b/internal/server/auth/auth.go @@ -2,6 +2,7 @@ package auth import ( "context" + "strings" "time" grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" @@ -44,6 +45,11 @@ func checkFunc(conf internalauth.Config) func(ctx context.Context) (context.Cont return ctx, nil } + if m, ok := grpc.Method(ctx); ok && strings.HasPrefix(m, "/grpc.health.v1.Health/") { + countStatus("health check (bypass auth)", conf.Transitioning).Inc() + return ctx, nil + } + err := gitalyauth.CheckToken(ctx, conf.Token, time.Now()) switch status.Code(err) { case codes.OK: |