diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-06-06 03:43:07 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-06-06 03:43:07 +0300 |
commit | d6a4017f311a34236a4e77c5aa281790c8834386 (patch) | |
tree | 12aae77f3b8fe71a72ed7e55c751026c0b58d086 | |
parent | 8f85c2d485c3949d5ca1f2abd0805f5dd5e40356 (diff) | |
parent | d717c692044c74b3a7b505152b7a953d5872fe3e (diff) |
Merge branch 'smh-remember-last-primary' into 'master'
Remember the previous write enabled primary node
See merge request gitlab-org/gitaly!2219
-rw-r--r-- | internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go | 13 | ||||
-rw-r--r-- | internal/praefect/nodes/local_elector.go | 35 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 9 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 147 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector.go | 60 | ||||
-rw-r--r-- | internal/praefect/nodes/sql_elector_test.go | 74 |
6 files changed, 245 insertions, 93 deletions
diff --git a/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go b/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go new file mode 100644 index 000000000..1995e7e10 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go @@ -0,0 +1,13 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200602154246_remember_previous_writable_primary", + Up: []string{"ALTER TABLE shard_primaries ADD COLUMN previous_writable_primary TEXT"}, + Down: []string{"ALTER TABLE shard_primaries DROP COLUMN previous_writable_primary"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go index 04711f760..e03e6ed49 100644 --- a/internal/praefect/nodes/local_elector.go +++ b/internal/praefect/nodes/local_elector.go @@ -20,14 +20,15 @@ type nodeCandidate struct { // shard. It does NOT support multiple Praefect nodes or have any // persistence. This is used mostly for testing and development. type localElector struct { - m sync.RWMutex - failoverEnabled bool - shardName string - nodes []*nodeCandidate - primaryNode *nodeCandidate - readOnlyAfterFailover bool - isReadOnly bool - log logrus.FieldLogger + m sync.RWMutex + failoverEnabled bool + shardName string + nodes []*nodeCandidate + primaryNode *nodeCandidate + readOnlyAfterFailover bool + previousWritablePrimary Node + isReadOnly bool + log logrus.FieldLogger } // healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary @@ -148,7 +149,17 @@ func (s *localElector) checkNodes(ctx context.Context) error { return ErrPrimaryNotHealthy } + var previousWritablePrimary Node + if s.primaryNode != nil { + previousWritablePrimary = s.primaryNode.node + } + + if s.isReadOnly { + previousWritablePrimary = s.previousWritablePrimary + } + s.primaryNode = newPrimary + s.previousWritablePrimary = previousWritablePrimary s.isReadOnly = s.readOnlyAfterFailover return nil @@ -161,6 +172,7 @@ func (s *localElector) GetShard() (Shard, error) { s.m.RLock() primary := s.primaryNode isReadOnly := s.isReadOnly + previousWritablePrimary := s.previousWritablePrimary s.m.RUnlock() if primary == nil { @@ -179,9 +191,10 @@ func (s *localElector) GetShard() (Shard, error) { } return Shard{ - IsReadOnly: isReadOnly, - Primary: primary.node, - Secondaries: secondaries, + PreviousWritablePrimary: previousWritablePrimary, + IsReadOnly: isReadOnly, + Primary: primary.node, + Secondaries: secondaries, }, nil } diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 4b57a8868..27fd623b5 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -28,9 +28,12 @@ import ( // Shard is a primary with a set of secondaries type Shard struct { - IsReadOnly bool - Primary Node - Secondaries []Node + // PreviousWritablePrimary is the virtual storage's previous + // write-enabled primary. + PreviousWritablePrimary Node + IsReadOnly bool + Primary Node + Secondaries []Node } func (s Shard) GetNode(storage string) (Node, error) { diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index f3d743203..baead7cba 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -18,6 +18,45 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) +type nodeAssertion struct { + Storage string + Address string +} + +type shardAssertion struct { + PreviousWritablePrimary *nodeAssertion + IsReadOnly bool + Primary *nodeAssertion + Secondaries []nodeAssertion +} + +func toNodeAssertion(n Node) *nodeAssertion { + if n == nil { + return nil + } + + return &nodeAssertion{ + Storage: n.GetStorage(), + Address: n.GetAddress(), + } +} + +func assertShard(t *testing.T, exp shardAssertion, act Shard) { + t.Helper() + + actSecondaries := make([]nodeAssertion, 0, len(act.Secondaries)) + for _, n := range act.Secondaries { + actSecondaries = append(actSecondaries, *toNodeAssertion(n)) + } + + require.Equal(t, exp, shardAssertion{ + PreviousWritablePrimary: toNodeAssertion(act.PreviousWritablePrimary), + IsReadOnly: act.IsReadOnly, + Primary: toNodeAssertion(act.Primary), + Secondaries: actSecondaries, + }) +} + func TestNodeStatus(t *testing.T) { socket := testhelper.GetTemporaryGitalySocketFileName() svr, healthSvr := testhelper.NewServerWithHealth(t, socket) @@ -202,20 +241,21 @@ func TestNodeManager(t *testing.T) { srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1) defer srv1.Stop() + node1 := &config.Node{ + Storage: "praefect-internal-0", + Address: "unix://" + internalSocket0, + DefaultPrimary: true, + } + + node2 := &config.Node{ + Storage: "praefect-internal-1", + Address: "unix://" + internalSocket1, + } + virtualStorages := []*config.VirtualStorage{ { - Name: "virtual-storage-0", - Nodes: []*config.Node{ - { - Storage: "praefect-internal-0", - Address: "unix://" + internalSocket0, - DefaultPrimary: true, - }, - { - Storage: "praefect-internal-1", - Address: "unix://" + internalSocket1, - }, - }, + Name: "virtual-storage-0", + Nodes: []*config.Node{node1, node2}, }, } @@ -238,9 +278,6 @@ func TestNodeManager(t *testing.T) { nm.Start(1*time.Millisecond, 5*time.Second) nmWithoutFailover.Start(1*time.Millisecond, 5*time.Second) - _, err = nm.GetShard("virtual-storage-0") - require.NoError(t, err) - shardWithoutFailover, err := nmWithoutFailover.GetShard("virtual-storage-0") require.NoError(t, err) @@ -248,22 +285,23 @@ func TestNodeManager(t *testing.T) { require.NoError(t, err) // shard without failover and shard with failover should be the same - require.Equal(t, shardWithoutFailover.Primary.GetStorage(), shard.Primary.GetStorage()) - require.Equal(t, shardWithoutFailover.Primary.GetAddress(), shard.Primary.GetAddress()) - require.Len(t, shard.Secondaries, 1) - require.Equal(t, shardWithoutFailover.Secondaries[0].GetStorage(), shard.Secondaries[0].GetStorage()) - require.Equal(t, shardWithoutFailover.Secondaries[0].GetAddress(), shard.Secondaries[0].GetAddress()) - require.False(t, shard.IsReadOnly) - require.False(t, shardWithoutFailover.IsReadOnly) - - require.Equal(t, virtualStorages[0].Nodes[0].Storage, shard.Primary.GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[0].Address, shard.Primary.GetAddress()) - require.Len(t, shard.Secondaries, 1) - require.Equal(t, virtualStorages[0].Nodes[1].Storage, shard.Secondaries[0].GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[1].Address, shard.Secondaries[0].GetAddress()) + initialState := shardAssertion{ + Primary: &nodeAssertion{node1.Storage, node1.Address}, + Secondaries: []nodeAssertion{{node2.Storage, node2.Address}}, + } + assertShard(t, initialState, shard) + assertShard(t, initialState, shardWithoutFailover) + + const unhealthyCheckCount = 1 + const healthyCheckCount = healthcheckThreshold + checkShards := func(count int) { + for i := 0; i < count; i++ { + nm.checkShards() + } + } - healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) - nm.checkShards() + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + checkShards(unhealthyCheckCount) labelsCalled := mockHistogram.LabelsCalled() for _, node := range virtualStorages[0].Nodes { @@ -286,29 +324,48 @@ func TestNodeManager(t *testing.T) { require.NotEqual(t, shardWithoutFailover.Secondaries[0].GetAddress(), shard.Secondaries[0].GetAddress()) // shard without failover should still match the config - require.Equal(t, virtualStorages[0].Nodes[0].Storage, shardWithoutFailover.Primary.GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[0].Address, shardWithoutFailover.Primary.GetAddress()) - require.Len(t, shard.Secondaries, 1) - require.Equal(t, virtualStorages[0].Nodes[1].Storage, shardWithoutFailover.Secondaries[0].GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[1].Address, shardWithoutFailover.Secondaries[0].GetAddress()) - require.False(t, shardWithoutFailover.IsReadOnly, - "shard should not be read-only after primary failure with failover disabled") + assertShard(t, initialState, shardWithoutFailover) // shard with failover should have promoted a secondary to primary and demoted the primary to a secondary - require.Equal(t, virtualStorages[0].Nodes[1].Storage, shard.Primary.GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[1].Address, shard.Primary.GetAddress()) - require.Len(t, shard.Secondaries, 1) - require.Equal(t, virtualStorages[0].Nodes[0].Storage, shard.Secondaries[0].GetStorage()) - require.Equal(t, virtualStorages[0].Nodes[0].Address, shard.Secondaries[0].GetAddress()) - require.True(t, shard.IsReadOnly, "shard should be read-only after a failover") + assertShard(t, shardAssertion{ + // previous write enabled primary should have been recorded + PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address}, + IsReadOnly: true, + Primary: &nodeAssertion{node2.Storage, node2.Address}, + Secondaries: []nodeAssertion{{node1.Storage, node1.Address}}, + }, shard) + + // failing back to the original primary + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + checkShards(healthyCheckCount) + + shard, err = nm.GetShard("virtual-storage-0") + require.NoError(t, err) + + assertShard(t, shardAssertion{ + // previous rw primary is now the primary again, field remains the same + // as node2 was not write enabled + PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address}, + IsReadOnly: true, + Primary: &nodeAssertion{node1.Storage, node1.Address}, + Secondaries: []nodeAssertion{{node2.Storage, node2.Address}}, + }, shard) require.NoError(t, nm.EnableWrites(context.Background(), "virtual-storage-0")) shard, err = nm.GetShard("virtual-storage-0") require.NoError(t, err) - require.False(t, shard.IsReadOnly, "shard should be write enabled") + assertShard(t, shardAssertion{ + PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address}, + IsReadOnly: false, + Primary: &nodeAssertion{node1.Storage, node1.Address}, + Secondaries: []nodeAssertion{{node2.Storage, node2.Address}}, + }, shard) + + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) - nm.checkShards() + checkShards(unhealthyCheckCount) _, err = nm.GetShard("virtual-storage-0") require.Error(t, err, "should return error since no nodes are healthy") diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go index 07b933e1f..07505e384 100644 --- a/internal/praefect/nodes/sql_elector.go +++ b/internal/praefect/nodes/sql_elector.go @@ -186,7 +186,7 @@ func (s *sqlElector) checkNodes(ctx context.Context) error { // The attempt to elect a primary may have conflicted with another // node attempting to elect a primary. We check the database again // to see the current state. - candidate, _, err := s.lookupPrimary() + candidate, _, _, err := s.lookupPrimary() if err != nil { s.log.WithError(err).Error("error looking up primary") return err @@ -252,7 +252,7 @@ last_contact_attempt_at = NOW()` // GetShard gets the current status of the shard. ErrPrimaryNotHealthy // is returned if a primary does not exist. func (s *sqlElector) GetShard() (Shard, error) { - primary, readOnly, err := s.lookupPrimary() + primary, readOnly, prevWritablePrimary, err := s.lookupPrimary() if err != nil { return Shard{}, err } @@ -270,9 +270,10 @@ func (s *sqlElector) GetShard() (Shard, error) { } return Shard{ - IsReadOnly: readOnly, - Primary: primary, - Secondaries: secondaries, + PreviousWritablePrimary: prevWritablePrimary, + IsReadOnly: readOnly, + Primary: primary, + Secondaries: secondaries, }, nil } @@ -414,6 +415,22 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error { // do not yet have a row in the table, from starting in read-only mode. In a failover scenario, // a row already exists in the table denoting the previous primary, and thus the shard should // be switched to read-only mode. + // + // Previous write-enabled primary is stored in `previous_writable_primary` column. We store it to determine + // unreplicated writes from the previous write-enabled primary to the current primary to report and + // automatically repair data loss cases. Read-only primaries are not stored, as they can't receive + // writes that could fail to be replicated to other nodes. Consider the failover scenarios: + // N1 (RW) -> N2 (RO) -> N1 (RO): `previous_writable_primary` remains N1 as N1 was the only write-enabled primary + // and thus has all the possible writes + // N1 (RW) -> N2 (RW) -> N1 (RO): `previous_writable_primary` is N2 as we only store the previous write-enabled + // primary. If writes are enable on shard with data loss, the data loss + // is considered acknowledged. + // N1 (RO) -> N2 (RW) : `previous_writable_primary` is null as there could not have been unreplicated + // writes from the read-only primary N1 + // N1 (RW) -> N2 (RW) : `previous_writable_primary` is N1 as it might have had unreplicated writes when + // N2 got elected + // N1 (RW) -> N2 (RO) -> N3 (RW): `previous_writable_primary` is N1 as N2 was not write-enabled before the second + // failover and thus any data missing from N3 must be on N1. q = `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at) SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW() WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR AND demoted = false), '') @@ -423,10 +440,13 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error { , elected_at = EXCLUDED.elected_at , read_only = $5 , demoted = false + , previous_writable_primary = CASE WHEN shard_primaries.read_only + THEN shard_primaries.previous_writable_primary + ELSE shard_primaries.node_name + END WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4 ` _, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, failoverTimeout.Microseconds(), s.readOnlyAfterFailover) - if err != nil { s.log.Errorf("error updating new primary: %s", err) return err @@ -491,7 +511,7 @@ func (s *sqlElector) validateAndUpdatePrimary() error { } // Check if primary is in this list - primaryNode, _, err := s.lookupPrimary() + primaryNode, _, _, err := s.lookupPrimary() if err != nil { s.log.WithError(err).Error("error looking up primary") @@ -509,17 +529,20 @@ func (s *sqlElector) validateAndUpdatePrimary() error { return nil } -func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, error) { - var primaryName string - var readOnly bool +func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, Node, error) { + var ( + primaryName string + readOnly bool + prevWritablePrimaryName sql.NullString + ) - const q = `SELECT node_name, read_only FROM shard_primaries WHERE shard_name = $1 AND demoted = false` - if err := s.db.QueryRow(q, s.shardName).Scan(&primaryName, &readOnly); err != nil { + const q = `SELECT node_name, read_only, previous_writable_primary FROM shard_primaries WHERE shard_name = $1 AND demoted = false` + if err := s.db.QueryRow(q, s.shardName).Scan(&primaryName, &readOnly, &prevWritablePrimaryName); err != nil { if err == sql.ErrNoRows { - return nil, false, nil + return nil, false, nil, nil } - return nil, false, fmt.Errorf("error looking up primary: %w", err) + return nil, false, nil, fmt.Errorf("error looking up primary: %w", err) } var primaryNode *sqlCandidate @@ -527,5 +550,12 @@ func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, error) { primaryNode = s.lookupNodeByName(primaryName) } - return primaryNode, readOnly, nil + var prevWritablePrimary Node + if prevWritablePrimaryName.Valid { + if cand := s.lookupNodeByName(prevWritablePrimaryName.String); cand != nil { + prevWritablePrimary = cand.Node + } + } + + return primaryNode, readOnly, prevWritablePrimary, nil } diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go index f6a459334..95c37a81d 100644 --- a/internal/praefect/nodes/sql_elector_test.go +++ b/internal/praefect/nodes/sql_elector_test.go @@ -82,23 +82,25 @@ func TestBasicFailover(t *testing.T) { srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1) defer srv1.Stop() + addr0 := "unix://" + internalSocket0 cc0, err := grpc.Dial( - "unix://"+internalSocket0, + addr0, grpc.WithInsecure(), ) require.NoError(t, err) + addr1 := "unix://" + internalSocket1 cc1, err := grpc.Dial( - "unix://"+internalSocket1, + addr1, grpc.WithInsecure(), ) require.NoError(t, err) storageName := "default" - mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec() - cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0) - cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1) + + cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec()) + cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec()) ns := []*nodeStatus{cs0, cs1} elector := newSQLElector(shardName, conf, db.DB, logger, ns) @@ -114,10 +116,11 @@ func TestBasicFailover(t *testing.T) { require.Equal(t, cs0, elector.primaryNode.Node) shard, err := elector.GetShard() require.NoError(t, err) - require.Equal(t, cs0.GetStorage(), shard.Primary.GetStorage()) - require.Equal(t, 1, len(shard.Secondaries)) - require.Equal(t, cs1.GetStorage(), shard.Secondaries[0].GetStorage()) - require.False(t, shard.IsReadOnly, "new shard should not be read-only") + assertShard(t, shardAssertion{ + IsReadOnly: false, + Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}}, + }, shard) // Bring first node down healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) @@ -128,7 +131,11 @@ func TestBasicFailover(t *testing.T) { require.NoError(t, err) shard, err = elector.GetShard() require.NoError(t, err) - require.False(t, shard.IsReadOnly) + assertShard(t, shardAssertion{ + IsReadOnly: false, + Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}}, + }, shard) // Predate the timeout to exceed it predateLastSeenActiveAt(t, db, shardName, cs0.GetStorage(), failoverTimeout) @@ -141,17 +148,46 @@ func TestBasicFailover(t *testing.T) { db.RequireRowsInTable(t, "shard_primaries", 1) shard, err = elector.GetShard() require.NoError(t, err) - require.Equal(t, cs1.GetStorage(), shard.Primary.GetStorage()) - require.True(t, shard.IsReadOnly, "shard should be read-only after a failover") + assertShard(t, shardAssertion{ + // previous primary was write-enabled, so we should record it + PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + IsReadOnly: true, + Primary: &nodeAssertion{cs1.GetStorage(), cs1.GetAddress()}, + Secondaries: []nodeAssertion{{cs0.GetStorage(), cs0.GetAddress()}}, + }, shard) + + // Failover back to the original node + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + predateElection(t, db, shardName, failoverTimeout) + predateLastSeenActiveAt(t, db, shardName, cs1.GetStorage(), failoverTimeout) + require.NoError(t, elector.checkNodes(ctx)) + + shard, err = elector.GetShard() + require.NoError(t, err) + assertShard(t, shardAssertion{ + // failing back to the original node means the primary is the same as the + // previous write-enabled primary. Node 2 was read-only, so field is not + // modified + PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + IsReadOnly: true, + Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}}, + }, shard) // We should be able to enable writes on the new primary require.NoError(t, elector.enableWrites(ctx)) shard, err = elector.GetShard() require.NoError(t, err) - require.False(t, shard.IsReadOnly, "") + assertShard(t, shardAssertion{ + PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + IsReadOnly: false, + Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()}, + Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}}, + }, shard) // Bring second node down - healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) err = elector.checkNodes(ctx) require.NoError(t, err) @@ -178,13 +214,13 @@ func TestElectDemotedPrimary(t *testing.T) { candidates := []*sqlCandidate{{Node: &nodeStatus{Node: node}}} require.NoError(t, elector.electNewPrimary(candidates)) - primary, _, err := elector.lookupPrimary() + primary, _, _, err := elector.lookupPrimary() require.NoError(t, err) require.Equal(t, node.Storage, primary.GetStorage()) require.NoError(t, elector.demotePrimary()) - primary, _, err = elector.lookupPrimary() + primary, _, _, err = elector.lookupPrimary() require.NoError(t, err) require.Nil(t, primary) @@ -192,7 +228,7 @@ func TestElectDemotedPrimary(t *testing.T) { require.NoError(t, err) require.NoError(t, elector.electNewPrimary(candidates)) - primary, _, err = elector.lookupPrimary() + primary, _, _, err = elector.lookupPrimary() require.NoError(t, err) require.Equal(t, node.Storage, primary.GetStorage()) } @@ -344,7 +380,7 @@ func TestElectNewPrimary(t *testing.T) { elector := newSQLElector(shardName, conf, db.DB, testhelper.DiscardTestLogger(t), ns) require.NoError(t, elector.electNewPrimary(candidates)) - primary, readOnly, err := elector.lookupPrimary() + primary, readOnly, _, err := elector.lookupPrimary() require.NoError(t, err) require.Equal(t, "gitaly-1", primary.GetStorage(), "since replication queue is empty the first candidate should be chosen") require.False(t, readOnly) @@ -359,7 +395,7 @@ func TestElectNewPrimary(t *testing.T) { elector.log = logger require.NoError(t, elector.electNewPrimary(candidates)) - primary, readOnly, err = elector.lookupPrimary() + primary, readOnly, _, err = elector.lookupPrimary() require.NoError(t, err) require.Equal(t, testCase.expectedPrimary, primary.GetStorage()) require.True(t, readOnly) |