diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-10 13:05:17 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-08-10 13:05:17 +0300 |
commit | 0e49dee763f596ca9223e88cd68b2f09a56d68a7 (patch) | |
tree | 8d0c606e16dbd608ffee0f57c01cf5fae1a4825a | |
parent | af1270bf79add2365355ab9c4d47e65897ac4253 (diff) | |
parent | a64e75992ed0a769e3310b260ede8ca5354b9f39 (diff) |
Merge branch 'ps-disabled-failover-elector' into 'master'
Nodes elector for configuration with disabled failover
Closes #3011
See merge request gitlab-org/gitaly!2444
-rw-r--r-- | changelogs/unreleased/ps-disabled-failover-elector.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 3 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/disabled_elector.go | 66 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector.go | 24 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 32 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 83 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 1 |
11 files changed, 169 insertions, 62 deletions
diff --git a/changelogs/unreleased/ps-disabled-failover-elector.yml b/changelogs/unreleased/ps-disabled-failover-elector.yml new file mode 100644 index 000000000..814ce7559 --- /dev/null +++ b/changelogs/unreleased/ps-disabled-failover-elector.yml @@ -0,0 +1,5 @@ +--- +title: Nodes elector for configuration with disabled failover +merge_request: 2444 +author: +type: fixed diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 41205b12e..4783279bc 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -241,6 +241,7 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } nodeManager.Start(conf.Failover.BootstrapInterval.Duration(), conf.Failover.MonitorInterval.Duration()) + logger.Info("background started: gitaly nodes health monitoring") transactionCounterMetric, err := metrics.RegisterTransactionCounter() if err != nil { @@ -345,7 +346,9 @@ func run(cfgs []starter.Config, conf config.Config) error { } repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second)) + logger.Info("background started: processing of the replication events") repl.ProcessStale(ctx, 30*time.Second, time.Minute) + logger.Info("background started: processing of the stale replication events") return b.Wait(conf.GracefulStopTimeout.Duration()) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 63097d010..b9f01c984 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -151,6 +151,7 @@ func TestStreamDirectorMutator(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) txMgr := transactions.NewManager() @@ -343,6 +344,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) require.NoError(t, err) @@ -776,6 +778,8 @@ func TestAbsentCorrelationID(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + txMgr := transactions.NewManager() coordinator := NewCoordinator( diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 663550e65..551b55d3c 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -191,7 +191,7 @@ func defaultTxMgr() *transactions.Manager { func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec()) require.NoError(t, err) - nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + nodeMgr.Start(0, time.Hour) return nodeMgr } @@ -371,6 +371,7 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) { srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(internalauth.Config{Token: token}))) mock.RegisterSimpleServiceServer(srv, m) + healthpb.RegisterHealthServer(srv, health.NewServer()) // client to backend service lis, port := listenAvailPort(tb) diff --git a/internal/praefect/nodes/disabled_elector.go b/internal/praefect/nodes/disabled_elector.go new file mode 100644 index 000000000..5464a8fd0 --- /dev/null +++ b/internal/praefect/nodes/disabled_elector.go @@ -0,0 +1,66 @@ +package nodes + +import ( + "context" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" +) + +// newDisabledElector returns a stub that always returns the same shard where the +// primary is the first node from the passed in list. +func newDisabledElector(virtualStorage string, ns []*nodeStatus) *disabledElector { + secondaries := make([]Node, len(ns)-1) + for i, node := range ns[1:] { + secondaries[i] = node + } + return &disabledElector{virtualStorage: virtualStorage, shard: Shard{Primary: ns[0], Secondaries: secondaries}} +} + +type disabledElector struct { + shard Shard + virtualStorage string +} + +func (de *disabledElector) start(bootstrap, _ time.Duration) { + timer := time.NewTimer(bootstrap) + defer timer.Stop() + + for i := 0; i < healthcheckThreshold; i++ { + <-timer.C + ctx := context.TODO() + _ = de.checkNodes(ctx) + timer.Reset(bootstrap) + } + + de.updateMetrics() +} + +func (de *disabledElector) updateMetrics() { + metrics.PrimaryGauge.WithLabelValues(de.virtualStorage, de.shard.Primary.GetStorage()).Set(1) + for _, n := range de.shard.Secondaries { + metrics.PrimaryGauge.WithLabelValues(de.virtualStorage, n.GetStorage()).Set(0) + } +} + +func (de *disabledElector) checkNodes(ctx context.Context) error { + var wg sync.WaitGroup + for _, n := range append(de.shard.Secondaries, de.shard.Primary) { + wg.Add(1) + go func(n Node) { + defer wg.Done() + _, _ = n.CheckHealth(ctx) + }(n) + } + wg.Wait() + return nil +} + +func (de *disabledElector) GetShard() (Shard, error) { + if !de.shard.Primary.IsHealthy() { + return Shard{}, ErrPrimaryNotHealthy + } + + return de.shard, nil +} diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go index 680602abc..e72b47cda 100644 --- a/internal/praefect/nodes/local_elector.go +++ b/internal/praefect/nodes/local_elector.go @@ -14,25 +14,23 @@ import ( // shard. It does NOT support multiple Praefect nodes or have any // persistence. This is used mostly for testing and development. type localElector struct { - m sync.RWMutex - failoverEnabled bool - shardName string - nodes []Node - primaryNode Node - log logrus.FieldLogger + m sync.RWMutex + shardName string + nodes []Node + primaryNode Node + log logrus.FieldLogger } -func newLocalElector(name string, failoverEnabled bool, log logrus.FieldLogger, ns []*nodeStatus) *localElector { +func newLocalElector(name string, log logrus.FieldLogger, ns []*nodeStatus) *localElector { nodes := make([]Node, len(ns)) for i, n := range ns { nodes[i] = n } return &localElector{ - shardName: name, - failoverEnabled: failoverEnabled, - log: log.WithField("virtual_storage", name), - nodes: nodes[:], - primaryNode: nodes[0], + shardName: name, + log: log.WithField("virtual_storage", name), + nodes: nodes[:], + primaryNode: nodes[0], } } @@ -125,7 +123,7 @@ func (s *localElector) GetShard() (Shard, error) { return Shard{}, ErrPrimaryNotHealthy } - if s.failoverEnabled && !primary.IsHealthy() { + if !primary.IsHealthy() { return Shard{}, ErrPrimaryNotHealthy } diff --git a/internal/praefect/nodes/local_elector_test.go b/internal/praefect/nodes/local_elector_test.go index 8ca4ef524..5a43eb5a6 100644 --- a/internal/praefect/nodes/local_elector_test.go +++ b/internal/praefect/nodes/local_elector_test.go @@ -30,7 +30,7 @@ func setupElector(t *testing.T) (*localElector, []*nodeStatus, *grpc.ClientConn, secondary := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec1) ns := []*nodeStatus{cs, secondary} logger := testhelper.NewTestLogger(t).WithField("test", t.Name()) - strategy := newLocalElector(storageName, true, logger, ns) + strategy := newLocalElector(storageName, logger, ns) strategy.bootstrap(time.Second) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 5a997c9a3..9801efa4b 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -92,10 +92,6 @@ type Node interface { // Mgr is a concrete type that adheres to the Manager interface type Mgr struct { - failoverEnabled bool - // log must be used only for non-request specific needs like bootstrapping, etc. - // for request related logging `ctxlogrus.Extract(ctx)` must be used. - log *logrus.Entry // strategies is a map of strategies keyed on virtual storage name strategies map[string]leaderElectionStrategy db *sql.DB @@ -152,33 +148,29 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, rs datastore.Rep ns = append(ns, cs) } - if c.Failover.Enabled && c.Failover.ElectionStrategy == "sql" { - strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, db, log, ns) + if c.Failover.Enabled { + if c.Failover.ElectionStrategy == "sql" { + strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, db, log, ns) + } else { + strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, log, ns) + } } else { - strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, c.Failover.Enabled, log, ns) + strategies[virtualStorage.Name] = newDisabledElector(virtualStorage.Name, ns) } } return &Mgr{ - log: log, - db: db, - failoverEnabled: c.Failover.Enabled, - strategies: strategies, - rs: rs, + db: db, + strategies: strategies, + rs: rs, }, nil } // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off // the monitoring process. Start must be called before NodeMgr can be used. func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { - if n.failoverEnabled { - n.log.Info("Starting failover checks") - - for _, strategy := range n.strategies { - strategy.start(bootstrapInterval, monitorInterval) - } - } else { - n.log.Info("Failover checks are disabled") + for _, strategy := range n.strategies { + strategy.start(bootstrapInterval, monitorInterval) } } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 48b1eddb6..7fa34d838 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -134,8 +134,8 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { nm.checkShards() shard, err = nm.GetShard(virtualStorageName) - require.NoError(t, err) - require.Equal(t, primaryStorage, shard.Primary.GetStorage()) + require.Error(t, err) + require.Equal(t, ErrPrimaryNotHealthy, err) } func TestDialWithUnhealthyNode(t *testing.T) { @@ -279,8 +279,9 @@ func TestNodeManager(t *testing.T) { nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram) require.NoError(t, err) - nm.Start(1*time.Millisecond, 5*time.Second) - nmWithoutFailover.Start(1*time.Millisecond, 5*time.Second) + // monitoring period set to 1 hour as we execute health checks by hands in this test + nm.Start(0, time.Hour) + nmWithoutFailover.Start(0, time.Hour) shardWithoutFailover, err := nmWithoutFailover.GetShard("virtual-storage-0") require.NoError(t, err) @@ -301,6 +302,7 @@ func TestNodeManager(t *testing.T) { checkShards := func(count int) { for i := 0; i < count; i++ { nm.checkShards() + nmWithoutFailover.checkShards() } } @@ -312,24 +314,15 @@ func TestNodeManager(t *testing.T) { require.Contains(t, labelsCalled, []string{node.Storage}) } + // since the failover is disabled the attempt to get a shard with unhealthy primary fails + _, err = nmWithoutFailover.GetShard("virtual-storage-0") + require.Error(t, err) + require.Equal(t, ErrPrimaryNotHealthy, err) + // since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy // secondary to primary - - shardWithoutFailover, err = nmWithoutFailover.GetShard("virtual-storage-0") - require.NoError(t, err) - shard, err = nm.GetShard("virtual-storage-0") require.NoError(t, err) - - // shard without failover and shard with failover should not be the same - require.NotEqual(t, shardWithoutFailover.Primary.GetStorage(), shard.Primary.GetStorage()) - require.NotEqual(t, shardWithoutFailover.Primary.GetAddress(), shard.Primary.GetAddress()) - require.NotEqual(t, shardWithoutFailover.Secondaries[0].GetStorage(), shard.Secondaries[0].GetStorage()) - require.NotEqual(t, shardWithoutFailover.Secondaries[0].GetAddress(), shard.Secondaries[0].GetAddress()) - - // shard without failover should still match the config - assertShard(t, initialState, shardWithoutFailover) - // shard with failover should have promoted a secondary to primary and demoted the primary to a secondary assertShard(t, shardAssertion{ Primary: &nodeAssertion{node2.Storage, node2.Address}, @@ -374,35 +367,38 @@ func TestMgr_GetSyncedNode(t *testing.T) { conf := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: nodes[:]}}, - Failover: config.Failover{Enabled: true}, } ctx, cancel := testhelper.Context() defer cancel() - verify := func(scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { + verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { + conf.Failover.Enabled = failover rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec()) require.NoError(t, err) - nm.Start(time.Duration(0), time.Hour) + for i := range healthSrvs { + healthSrvs[i].SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + } + nm.Start(0, time.Hour) return func(t *testing.T) { scenario(t, nm, rs) } } - t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + t.Run("unknown virtual storage", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "stub") require.True(t, errors.Is(err, ErrVirtualStorageNotExist)) })) - t.Run("state is undefined", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + t.Run("state is undefined", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { node, err := nm.GetSyncedNode(ctx, virtualStorage, "no/matter") require.NoError(t, err) require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "") })) - t.Run("multiple storages up to date", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + t.Run("multiple storages up to date", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil)) generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0") require.NoError(t, err) @@ -420,7 +416,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { } })) - t.Run("single secondary storage up to date but unhealthy", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + t.Run("single secondary storage up to date but unhealthy", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil)) generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0") require.NoError(t, err) @@ -443,7 +439,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "secondary shouldn't be chosen as it is unhealthy") })) - t.Run("no healthy storages", verify(func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + t.Run("no healthy storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil)) generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0") require.NoError(t, err) @@ -472,6 +468,41 @@ func TestMgr_GetSyncedNode(t *testing.T) { _, err = nm.GetSyncedNode(ctx, virtualStorage, repoPath) require.True(t, errors.Is(err, ErrPrimaryNotHealthy)) })) + + t.Run("disabled failover doesn't disable health state", verify(false, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { + require.NoError(t, rs.IncrementGeneration(ctx, virtualStorage, repoPath, "gitaly-0", nil)) + generation, err := rs.GetGeneration(ctx, virtualStorage, repoPath, "gitaly-0") + require.NoError(t, err) + require.NoError(t, rs.SetGeneration(ctx, virtualStorage, repoPath, "gitaly-1", generation)) + + shard, err := nm.GetShard(virtualStorage) + require.NoError(t, err) + + gitaly0, err := shard.GetNode("gitaly-0") + require.NoError(t, err) + + require.Equal(t, shard.Primary, gitaly0) + + gitaly0OK, err := gitaly0.CheckHealth(ctx) + require.NoError(t, err) + require.True(t, gitaly0OK) + + gitaly1, err := shard.GetNode("gitaly-1") + require.NoError(t, err) + + gitaly1OK, err := gitaly1.CheckHealth(ctx) + require.NoError(t, err) + require.True(t, gitaly1OK) + + healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) + gitaly0OK, err = gitaly0.CheckHealth(ctx) + require.NoError(t, err) + require.False(t, gitaly0OK, "primary should be unhealthy") + + _, err = nm.GetSyncedNode(ctx, virtualStorage, repoPath) + require.Error(t, err) + require.True(t, errors.Is(err, ErrPrimaryNotHealthy)) + })) } func TestNodeStatus_IsHealthy(t *testing.T) { diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index dad31494e..57d764823 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -40,6 +40,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" ) @@ -222,7 +224,7 @@ func TestPropagateReplicationJob(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) - nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + nodeMgr.Start(0, time.Hour) txMgr := transactions.NewManager() @@ -346,6 +348,7 @@ func runMockRepositoryServer(t *testing.T) (*mockRepositoryServer, string, func( mockServer := newMockRepositoryServer() gitalypb.RegisterRepositoryServiceServer(server, mockServer) + healthpb.RegisterHealthServer(server, health.NewServer()) reflection.Register(server) go server.Serve(listener) @@ -508,6 +511,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) replMgr := NewReplMgr( logEntry, @@ -653,6 +657,7 @@ func TestProcessBacklog_Success(t *testing.T) { nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) replMgr := NewReplMgr( logEntry, @@ -813,6 +818,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) { gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(RubyServer)) gitalypb.RegisterSSHServiceServer(svr, ssh.NewServer()) gitalypb.RegisterRefServiceServer(svr, ref.NewServer()) + healthpb.RegisterHealthServer(svr, health.NewServer()) reflection.Register(svr) listener, err := net.Listen("unix", socketName) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 64d0264cf..696c31acb 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -786,6 +786,7 @@ func TestProxyWrites(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) + nodeMgr.Start(0, time.Hour) ctx, cancel := testhelper.Context() defer cancel() |