Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-06-09 12:09:53 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-06-09 12:09:53 +0300
commit6a3f3a4bb4dce6543dfd448ac2c4deb5d36ac9a9 (patch)
treecce40e83d78915865cc167e26a0d309d88b89cda
parentd459fc26ea1db5842766daf41834b1f0042686a2 (diff)
parentbe8297265d90def3b9683a3013c99d8e7eb75d17 (diff)
Merge branch 'ps-dist-reads-to-healthy' into 'master'
Praefect: unhealthy nodes must not be used for read distribution Closes #2823 See merge request gitlab-org/gitaly!2250
-rw-r--r--changelogs/unreleased/ps-dist-reads-to-healthy.yml5
-rw-r--r--doc/design_ha.md13
-rw-r--r--internal/praefect/coordinator_test.go193
-rw-r--r--internal/praefect/nodes/local_elector.go81
-rw-r--r--internal/praefect/nodes/manager.go126
-rw-r--r--internal/praefect/nodes/manager_test.go115
-rw-r--r--internal/praefect/nodes/sql_elector.go10
-rw-r--r--internal/praefect/nodes/sql_elector_test.go14
-rw-r--r--internal/praefect/protoregistry/protoregistry.go4
9 files changed, 416 insertions, 145 deletions
diff --git a/changelogs/unreleased/ps-dist-reads-to-healthy.yml b/changelogs/unreleased/ps-dist-reads-to-healthy.yml
new file mode 100644
index 000000000..be2f6e987
--- /dev/null
+++ b/changelogs/unreleased/ps-dist-reads-to-healthy.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: unhealthy nodes must not be used for read distribution'
+merge_request: 2250
+author:
+type: fixed
diff --git a/doc/design_ha.md b/doc/design_ha.md
index 70af30a36..5165e9d2d 100644
--- a/doc/design_ha.md
+++ b/doc/design_ha.md
@@ -189,19 +189,6 @@ solve data loss cases by synchronizing the missing data from the old primary to
Alternatively, an administrator can choose to enable writes on the new primary immediately if they do not wish
to solve the data loss cases.
-### Distribution of reads
-
-Praefect supports distribution of read operations across Gitaly nodes that are configured for the virtual node.
-You can enable this feature with with `distributed_reads` feature flag (disabled by default).
-That means that RPCs marked with `ACCESSOR` option like
-[GetBlob](https://gitlab.com/gitlab-org/gitaly/-/blob/v12.10.6/proto/blob.proto#L16)
-will be redirected to the one of the up to date Gitaly nodes.
-_Up to date in_ this context means that the replication for this node is successfully completed (the replication queue
-for this storage and repository is empty). In case there is a failure on determining the up to date node or there is no
-such, the primary node will be taken to serve the request. To track distribution of read operations you can use a new
-Prometheus counter metric named `gitaly_praefect_read_distribution`. It has two labels: `virtual_storage` and `storage` that
-reflects the configuration defined for this instance of Praefect.
-
## Compared to Geo
Despite the similarities above, there are significant differences
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 457673808..33109c7cf 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -7,6 +7,7 @@ import (
"io/ioutil"
"sync"
"testing"
+ "time"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
@@ -146,10 +147,10 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
func TestStreamDirectorMutator(t *testing.T) {
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
- _, healthSrv0 := testhelper.NewServerWithHealth(t, gitalySocket0)
- _, healthSrv1 := testhelper.NewServerWithHealth(t, gitalySocket1)
- healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
- healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ defer srv1.Stop()
+ srv2, _ := testhelper.NewServerWithHealth(t, gitalySocket1)
+ defer srv2.Stop()
primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1", DefaultPrimary: true}
@@ -245,27 +246,22 @@ func TestStreamDirectorMutator(t *testing.T) {
}
func TestStreamDirectorAccessor(t *testing.T) {
- gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
- _, healthSrv0 := testhelper.NewServerWithHealth(t, gitalySocket0)
- _, healthSrv1 := testhelper.NewServerWithHealth(t, gitalySocket1)
- healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
- healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ gitalySocket := testhelper.GetTemporaryGitalySocketFileName()
+ srv, _ := testhelper.NewServerWithHealth(t, gitalySocket)
+ defer srv.Stop()
- primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
+ gitalyAddress := "unix://" + gitalySocket
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "praefect",
Nodes: []*config.Node{
{
- Address: primaryAddress,
+ Address: gitalyAddress,
Storage: "praefect-internal-1",
DefaultPrimary: true,
},
- {
- Address: secondaryAddress,
- Storage: "praefect-internal-2",
- }},
+ },
},
},
}
@@ -279,12 +275,12 @@ func TestStreamDirectorAccessor(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads)
entry := testhelper.DiscardTestEntry(t)
nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
+ nodeMgr.Start(0, time.Minute)
txMgr := transactions.NewManager()
@@ -298,7 +294,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
peeker := &mockPeeker{frame: frame}
streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
require.NoError(t, err)
- require.Equal(t, secondaryAddress, streamParams.Conn().Target())
+ require.Equal(t, gitalyAddress, streamParams.Conn().Target())
md, ok := metadata.FromOutgoingContext(streamParams.Context())
require.True(t, ok)
@@ -306,16 +302,173 @@ func TestStreamDirectorAccessor(t *testing.T) {
mi, err := coordinator.registry.LookupMethod(fullMethod)
require.NoError(t, err)
+ require.Equal(t, protoregistry.ScopeRepository, mi.Scope, "method must be repository scoped")
+ require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
m, err := protoMessageFromPeeker(mi, peeker)
require.NoError(t, err)
rewrittenTargetRepo, err := mi.TargetRepo(m)
require.NoError(t, err)
- require.Equal(t, "praefect-internal-2", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+}
- // must be invoked without issues
- streamParams.RequestFinalizer()
+func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
+ gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
+ srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ defer srv1.Stop()
+ srv2, healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1)
+ defer srv2.Stop()
+
+ primaryNodeConf := config.Node{
+ Address: "unix://" + gitalySocket0,
+ Storage: "gitaly-1",
+ DefaultPrimary: true,
+ }
+
+ secondaryNodeConf := config.Node{
+ Address: "unix://" + gitalySocket1,
+ Storage: "gitaly-2",
+ }
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "praefect",
+ Nodes: []*config.Node{&primaryNodeConf, &secondaryNodeConf},
+ },
+ },
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: "local",
+ },
+ }
+
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
+
+ targetRepo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+ ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads)
+
+ entry := testhelper.DiscardTestEntry(t)
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Minute)
+
+ txMgr := transactions.NewManager()
+
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+
+ t.Run("forwards accessor operations", func(t *testing.T) {
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ peeker := &mockPeeker{frame: frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ require.Equal(t, secondaryNodeConf.Address, streamParams.Conn().Target(), "must be redirected to secondary")
+
+ md, ok := metadata.FromOutgoingContext(streamParams.Context())
+ require.True(t, ok)
+ require.Contains(t, md, "praefect-server")
+
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+ require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
+
+ m, err := protoMessageFromPeeker(mi, peeker)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, "gitaly-2", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
+ })
+
+ t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) {
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ gitaly1, err := shard.GetNode(secondaryNodeConf.Storage)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, gitaly1, false)
+ defer func() {
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ waitNodeToChangeHealthStatus(ctx, t, gitaly1, true)
+ }()
+
+ frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.RefService/FindAllBranches"
+
+ peeker := &mockPeeker{frame: frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ require.Equal(t, primaryNodeConf.Address, streamParams.Conn().Target(), "must be redirected to primary")
+
+ md, ok := metadata.FromOutgoingContext(streamParams.Context())
+ require.True(t, ok)
+ require.Contains(t, md, "praefect-server")
+
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+ require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
+
+ m, err := protoMessageFromPeeker(mi, peeker)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
+ })
+
+ t.Run("doesn't forward mutator operations", func(t *testing.T) {
+ frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo})
+ require.NoError(t, err)
+
+ fullMethod := "/gitaly.OperationService/UserUpdateBranch"
+
+ peeker := &mockPeeker{frame: frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ require.Equal(t, primaryNodeConf.Address, streamParams.Conn().Target(), "must be redirected to primary")
+
+ md, ok := metadata.FromOutgoingContext(streamParams.Context())
+ require.True(t, ok)
+ require.Contains(t, md, "praefect-server")
+
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+ require.Equal(t, protoregistry.OpMutator, mi.Operation, "method must be a mutator")
+
+ m, err := protoMessageFromPeeker(mi, peeker)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
+ })
+}
+
+func waitNodeToChangeHealthStatus(ctx context.Context, t *testing.T, node nodes.Node, health bool) {
+ t.Helper()
+
+ ctx, cancel := context.WithTimeout(ctx, time.Second)
+ defer cancel()
+
+ for node.IsHealthy() != health {
+ _, err := node.CheckHealth(ctx)
+ require.NoError(t, err)
+ }
}
type mockPeeker struct {
diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go
index e03e6ed49..d0ddbd0a9 100644
--- a/internal/praefect/nodes/local_elector.go
+++ b/internal/praefect/nodes/local_elector.go
@@ -9,12 +9,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
)
-type nodeCandidate struct {
- m sync.RWMutex
- node Node
- statuses []bool
-}
-
// localElector relies on an in-memory datastore to track the primary
// and secondaries. A single election strategy pertains to a single
// shard. It does NOT support multiple Praefect nodes or have any
@@ -23,61 +17,24 @@ type localElector struct {
m sync.RWMutex
failoverEnabled bool
shardName string
- nodes []*nodeCandidate
- primaryNode *nodeCandidate
+ nodes []Node
+ primaryNode Node
readOnlyAfterFailover bool
previousWritablePrimary Node
isReadOnly bool
log logrus.FieldLogger
}
-// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
-// for deeming a node "healthy"
-const healthcheckThreshold = 3
-
-func (n *nodeCandidate) checkNode(ctx context.Context) {
- status, _ := n.node.check(ctx)
-
- n.m.Lock()
- defer n.m.Unlock()
-
- n.statuses = append(n.statuses, status)
-
- if len(n.statuses) > healthcheckThreshold {
- n.statuses = n.statuses[1:]
- }
-}
-
-func (n *nodeCandidate) isHealthy() bool {
- n.m.RLock()
- defer n.m.RUnlock()
-
- if len(n.statuses) < healthcheckThreshold {
- return false
- }
-
- for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
- if !status {
- return false
- }
- }
-
- return true
-}
-
func newLocalElector(name string, failoverEnabled, readOnlyAfterFailover bool, log logrus.FieldLogger, ns []*nodeStatus) *localElector {
- nodes := make([]*nodeCandidate, len(ns))
+ nodes := make([]Node, len(ns))
for i, n := range ns {
- nodes[i] = &nodeCandidate{
- node: n,
- }
+ nodes[i] = n
}
-
return &localElector{
shardName: name,
failoverEnabled: failoverEnabled,
- log: log,
- nodes: nodes,
+ log: log.WithField("virtual_storage", name),
+ nodes: nodes[:],
primaryNode: nodes[0],
readOnlyAfterFailover: readOnlyAfterFailover,
}
@@ -125,21 +82,27 @@ func (s *localElector) monitor(d time.Duration) {
func (s *localElector) checkNodes(ctx context.Context) error {
defer s.updateMetrics()
+ var wg sync.WaitGroup
for _, n := range s.nodes {
- n.checkNode(ctx)
+ wg.Add(1)
+ go func(n Node) {
+ defer wg.Done()
+ _, _ = n.CheckHealth(ctx)
+ }(n)
}
+ wg.Wait()
s.m.Lock()
defer s.m.Unlock()
- if s.primaryNode != nil && s.primaryNode.isHealthy() {
+ if s.primaryNode.IsHealthy() {
return nil
}
- var newPrimary *nodeCandidate
+ var newPrimary Node
for _, node := range s.nodes {
- if node != s.primaryNode && node.isHealthy() {
+ if node != s.primaryNode && node.IsHealthy() {
newPrimary = node
break
}
@@ -151,7 +114,7 @@ func (s *localElector) checkNodes(ctx context.Context) error {
var previousWritablePrimary Node
if s.primaryNode != nil {
- previousWritablePrimary = s.primaryNode.node
+ previousWritablePrimary = s.primaryNode
}
if s.isReadOnly {
@@ -179,21 +142,21 @@ func (s *localElector) GetShard() (Shard, error) {
return Shard{}, ErrPrimaryNotHealthy
}
- if s.failoverEnabled && !primary.isHealthy() {
+ if s.failoverEnabled && !primary.IsHealthy() {
return Shard{}, ErrPrimaryNotHealthy
}
var secondaries []Node
for _, n := range s.nodes {
if n != primary {
- secondaries = append(secondaries, n.node)
+ secondaries = append(secondaries, n)
}
}
return Shard{
PreviousWritablePrimary: previousWritablePrimary,
IsReadOnly: isReadOnly,
- Primary: primary.node,
+ Primary: primary,
Secondaries: secondaries,
}, nil
}
@@ -201,7 +164,7 @@ func (s *localElector) GetShard() (Shard, error) {
func (s *localElector) enableWrites(context.Context) error {
s.m.Lock()
defer s.m.Unlock()
- if s.primaryNode == nil || !s.primaryNode.isHealthy() {
+ if !s.primaryNode.IsHealthy() {
return ErrPrimaryNotHealthy
}
@@ -221,6 +184,6 @@ func (s *localElector) updateMetrics() {
val = 1
}
- metrics.PrimaryGauge.WithLabelValues(s.shardName, n.node.GetStorage()).Set(val)
+ metrics.PrimaryGauge.WithLabelValues(s.shardName, n.GetStorage()).Set(val)
}
}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 27fd623b5..a08a732d3 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/rand"
+ "sync"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -63,13 +64,26 @@ type Manager interface {
GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error)
}
+const (
+ // healthcheckTimeout is the max duration allowed for checking of node health status.
+ // If check takes more time it considered as failed.
+ healthcheckTimeout = 1 * time.Second
+ // healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
+ // for deeming a node "healthy"
+ healthcheckThreshold = 3
+)
+
// Node represents some metadata of a node as well as a connection
type Node interface {
GetStorage() string
GetAddress() string
GetToken() string
GetConnection() *grpc.ClientConn
- check(context.Context) (bool, error)
+ // IsHealthy reports if node is healthy and can handle requests.
+ // Node considered healthy if last 'healthcheckThreshold' checks were positive.
+ IsHealthy() bool
+ // CheckHealth executes health check for the node and tracks last 'healthcheckThreshold' checks for it.
+ CheckHealth(context.Context) (bool, error)
}
// Mgr is a concrete type that adheres to the Manager interface
@@ -107,6 +121,8 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.
defer cancel()
for _, virtualStorage := range c.VirtualStorages {
+ log = log.WithField("virtual_storage", virtualStorage.Name)
+
ns := make([]*nodeStatus, 1, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
conn, err := client.DialContext(ctx, node.Address,
@@ -206,85 +222,123 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err)
}
- var storages []string
- if featureflag.IsEnabled(ctx, featureflag.DistributedReads) {
- if storages, err = n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
- // this is recoverable error - proceed with primary node
- ctxlogrus.Extract(ctx).
- WithError(err).
- WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath}).
- Error("get up to date secondaries")
- }
+ if !featureflag.IsEnabled(ctx, featureflag.DistributedReads) {
+ return shard.Primary, nil
}
- if len(storages) == 0 {
- return shard.Primary, nil
+ logger := ctxlogrus.Extract(ctx).WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath})
+ upToDateStorages, err := n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath)
+ if err != nil {
+ // this is recoverable error - proceed with primary node
+ logger.WithError(err).Warn("get up to date secondaries")
}
- secondary := storages[rand.Intn(len(storages))] // randomly pick up one of the synced storages
- for _, node := range shard.Secondaries {
- if node.GetStorage() == secondary {
- return node, nil
+ var storages []Node
+ for _, upToDateStorage := range upToDateStorages {
+ node, err := shard.GetNode(upToDateStorage)
+ if err != nil {
+ // this is recoverable error - proceed with with other nodes
+ logger.WithError(err).Warn("storage returned as up-to-date")
+ }
+
+ if !node.IsHealthy() || node == shard.Primary {
+ continue
}
+
+ storages = append(storages, node)
+ }
+
+ if len(storages) == 0 {
+ return shard.Primary, nil
}
- return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration
+ return storages[rand.Intn(len(storages))], nil // randomly pick up one of the synced storages
}
-func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
+func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l logrus.FieldLogger, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
- Node: node,
- ClientConn: cc,
+ node: node,
+ clientConn: cc,
log: l,
latencyHist: latencyHist,
}
}
type nodeStatus struct {
- config.Node
- *grpc.ClientConn
- log *logrus.Entry
+ node config.Node
+ clientConn *grpc.ClientConn
+ log logrus.FieldLogger
latencyHist prommetrics.HistogramVec
+ mtx sync.RWMutex
+ statuses []bool
}
// GetStorage gets the storage name of a node
func (n *nodeStatus) GetStorage() string {
- return n.Storage
+ return n.node.Storage
}
// GetAddress gets the address of a node
func (n *nodeStatus) GetAddress() string {
- return n.Address
+ return n.node.Address
}
// GetToken gets the token of a node
func (n *nodeStatus) GetToken() string {
- return n.Token
+ return n.node.Token
}
// GetConnection gets the client connection of a node
func (n *nodeStatus) GetConnection() *grpc.ClientConn {
- return n.ClientConn
+ return n.clientConn
+}
+
+func (n *nodeStatus) IsHealthy() bool {
+ n.mtx.RLock()
+ healthy := n.isHealthy()
+ n.mtx.RUnlock()
+ return healthy
+}
+
+func (n *nodeStatus) isHealthy() bool {
+ if len(n.statuses) < healthcheckThreshold {
+ return false
+ }
+
+ for _, ok := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
+ if !ok {
+ return false
+ }
+ }
+
+ return true
}
-const checkTimeout = 1 * time.Second
+func (n *nodeStatus) updateStatus(status bool) {
+ n.mtx.Lock()
+ n.statuses = append(n.statuses, status)
+ if len(n.statuses) > healthcheckThreshold {
+ n.statuses = n.statuses[1:]
+ }
+ n.mtx.Unlock()
+}
-func (n *nodeStatus) check(ctx context.Context) (bool, error) {
- client := healthpb.NewHealthClient(n.ClientConn)
- ctx, cancel := context.WithTimeout(ctx, checkTimeout)
+func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) {
+ health := healthpb.NewHealthClient(n.clientConn)
+ ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout)
defer cancel()
status := false
start := time.Now()
- resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
- n.latencyHist.WithLabelValues(n.Storage).Observe(time.Since(start).Seconds())
+ resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
+ n.latencyHist.WithLabelValues(n.node.Storage).Observe(time.Since(start).Seconds())
if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
status = true
} else {
n.log.WithError(err).WithFields(logrus.Fields{
- "storage": n.Storage,
- "address": n.Address,
+ "storage": n.node.Storage,
+ "address": n.node.Address,
}).Warn("error when pinging healthcheck")
}
@@ -294,5 +348,7 @@ func (n *nodeStatus) check(ctx context.Context) (bool, error) {
}
metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(gaugeValue)
+ n.updateStatus(status)
+
return status, err
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index baead7cba..199cebb10 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -77,7 +78,7 @@ func TestNodeStatus(t *testing.T) {
var expectedLabels [][]string
for i := 0; i < healthcheckThreshold; i++ {
ctx := context.Background()
- status, err := cs.check(ctx)
+ status, err := cs.CheckHealth(ctx)
require.NoError(t, err)
require.True(t, status)
@@ -90,7 +91,7 @@ func TestNodeStatus(t *testing.T) {
healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
ctx := context.Background()
- status, err := cs.check(ctx)
+ status, err := cs.CheckHealth(ctx)
require.NoError(t, err)
require.False(t, status)
}
@@ -216,7 +217,7 @@ func TestBlockingDial(t *testing.T) {
// simulate gitaly node starting up later
go func() {
- time.Sleep(checkTimeout + 10*time.Millisecond)
+ time.Sleep(healthcheckTimeout + 10*time.Millisecond)
_, healthSrv0 := testhelper.NewHealthServerWithListener(t, lis)
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
@@ -588,3 +589,111 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs1Secondary, node.GetAddress())
}))
}
+
+func TestNodeStatus_IsHealthy(t *testing.T) {
+ checkNTimes := func(ctx context.Context, t *testing.T, ns *nodeStatus, n int) {
+ for i := 0; i < n; i++ {
+ _, err := ns.CheckHealth(ctx)
+ require.NoError(t, err)
+ }
+ }
+
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ address := "unix://" + socket
+
+ srv, healthSrv := testhelper.NewServerWithHealth(t, socket)
+ defer srv.Stop()
+
+ clientConn, err := client.Dial(address, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, clientConn.Close()) }()
+
+ node := config.Node{Storage: "gitaly-0", Address: address, DefaultPrimary: true}
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ logger := testhelper.DiscardTestLogger(t)
+ latencyHistMock := &promtest.MockHistogramVec{}
+
+ t.Run("unchecked node is unhealthy", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ require.False(t, ns.IsHealthy())
+ })
+
+ t.Run("not enough check to consider it healthy", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ checkNTimes(ctx, t, ns, healthcheckThreshold-1)
+
+ require.False(t, ns.IsHealthy())
+ })
+
+ t.Run("healthy", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ checkNTimes(ctx, t, ns, healthcheckThreshold)
+
+ require.True(t, ns.IsHealthy())
+ })
+
+ t.Run("healthy turns into unhealthy after single failed check", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ checkNTimes(ctx, t, ns, healthcheckThreshold)
+
+ require.True(t, ns.IsHealthy(), "node must be turned into healthy state")
+
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ checkNTimes(ctx, t, ns, 1)
+
+ require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state")
+ })
+
+ t.Run("unhealthy turns into healthy after pre-define threshold of checks", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ checkNTimes(ctx, t, ns, healthcheckThreshold)
+
+ require.True(t, ns.IsHealthy(), "node must be turned into healthy state")
+
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ checkNTimes(ctx, t, ns, 1)
+
+ require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state")
+
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ for i := 1; i < healthcheckThreshold; i++ {
+ checkNTimes(ctx, t, ns, 1)
+ require.False(t, ns.IsHealthy(), "node must be unhealthy until defined threshold of checks complete positively")
+ }
+ checkNTimes(ctx, t, ns, 1) // the last check that must turn it into healthy state
+
+ require.True(t, ns.IsHealthy(), "node should be healthy again")
+ })
+
+ t.Run("concurrent access has no races", func(t *testing.T) {
+ ns := newConnectionStatus(node, clientConn, logger, latencyHistMock)
+ healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+
+ t.Run("continuously does health checks - 1", func(t *testing.T) {
+ t.Parallel()
+ checkNTimes(ctx, t, ns, healthcheckThreshold)
+ })
+
+ t.Run("continuously checks health - 1", func(t *testing.T) {
+ t.Parallel()
+ ns.IsHealthy()
+ })
+
+ t.Run("continuously does health checks - 2", func(t *testing.T) {
+ t.Parallel()
+ checkNTimes(ctx, t, ns, healthcheckThreshold)
+ })
+
+ t.Run("continuously checks health - 2", func(t *testing.T) {
+ t.Parallel()
+ ns.IsHealthy()
+ })
+ })
+}
diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go
index 07505e384..2ead9f7d6 100644
--- a/internal/praefect/nodes/sql_elector.go
+++ b/internal/praefect/nodes/sql_elector.go
@@ -85,6 +85,7 @@ type sqlElector struct {
}
func newSQLElector(name string, c config.Config, db *sql.DB, log logrus.FieldLogger, ns []*nodeStatus) *sqlElector {
+ log = log.WithField("virtual_storage", name)
praefectName := getPraefectName(c, log)
log = log.WithField("praefectName", praefectName)
@@ -117,9 +118,7 @@ func getPraefectName(c config.Config, log logrus.FieldLogger) string {
if err != nil {
name = uuid.New().String()
- log.WithError(err).WithFields(logrus.Fields{
- "praefectName": name,
- }).Warn("unable to determine Praefect hostname, using randomly generated UUID")
+ log.WithError(err).WithField("praefectName", name).Warn("unable to determine Praefect hostname, using randomly generated UUID")
}
if c.ListenAddr != "" {
@@ -163,10 +162,9 @@ func (s *sqlElector) checkNodes(ctx context.Context) error {
go func(n Node) {
defer wg.Done()
- result, _ := n.check(ctx)
+ result, _ := n.CheckHealth(ctx)
if err := s.updateNode(n, result); err != nil {
s.log.WithError(err).WithFields(logrus.Fields{
- "shard": s.shardName,
"storage": n.GetStorage(),
"address": n.GetAddress(),
}).Error("error checking node")
@@ -215,7 +213,7 @@ func (s *sqlElector) setPrimary(candidate *sqlCandidate) {
s.log.WithFields(logrus.Fields{
"oldPrimary": oldPrimary,
"newPrimary": newPrimary,
- "shard": s.shardName}).Info("primary node changed")
+ }).Info("primary node changed")
s.primaryNode = candidate
}
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index 95c37a81d..e015f98ff 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -208,10 +208,10 @@ func TestElectDemotedPrimary(t *testing.T) {
config.Config{},
db.DB,
testhelper.DiscardTestLogger(t),
- []*nodeStatus{{Node: node}},
+ []*nodeStatus{{node: node}},
)
- candidates := []*sqlCandidate{{Node: &nodeStatus{Node: node}}}
+ candidates := []*sqlCandidate{{Node: &nodeStatus{node: node}}}
require.NoError(t, elector.electNewPrimary(candidates))
primary, _, _, err := elector.lookupPrimary()
@@ -263,17 +263,17 @@ func TestElectNewPrimary(t *testing.T) {
db := getDB(t)
ns := []*nodeStatus{{
- Node: config.Node{
+ node: config.Node{
Storage: "gitaly-0",
DefaultPrimary: true,
},
}, {
- Node: config.Node{
+ node: config.Node{
Storage: "gitaly-1",
DefaultPrimary: true,
},
}, {
- Node: config.Node{
+ node: config.Node{
Storage: "gitaly-2",
DefaultPrimary: true,
},
@@ -282,13 +282,13 @@ func TestElectNewPrimary(t *testing.T) {
candidates := []*sqlCandidate{
{
&nodeStatus{
- Node: config.Node{
+ node: config.Node{
Storage: "gitaly-1",
},
},
}, {
&nodeStatus{
- Node: config.Node{
+ node: config.Node{
Storage: "gitaly-2",
},
},
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index f7ebe6f7e..cabc38b19 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -46,7 +46,7 @@ type OpType int
const (
// OpUnknown = unknown operation type
- OpUnknown = iota
+ OpUnknown OpType = iota
// OpAccessor = accessor operation type (ready only)
OpAccessor
// OpMutator = mutator operation type (modifies a repository)
@@ -60,7 +60,7 @@ const (
// ScopeUnknown is the default scope until determined otherwise
ScopeUnknown = iota
// ScopeRepository indicates an RPC's scope is limited to a repository
- ScopeRepository = iota
+ ScopeRepository Scope = iota
// ScopeStorage indicates an RPC is scoped to an entire storage location
ScopeStorage
// ScopeServer indicates an RPC is scoped to an entire server