Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-06-06 03:43:07 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-06-06 03:43:07 +0300
commitd6a4017f311a34236a4e77c5aa281790c8834386 (patch)
tree12aae77f3b8fe71a72ed7e55c751026c0b58d086
parent8f85c2d485c3949d5ca1f2abd0805f5dd5e40356 (diff)
parentd717c692044c74b3a7b505152b7a953d5872fe3e (diff)
Merge branch 'smh-remember-last-primary' into 'master'
Remember the previous write enabled primary node See merge request gitlab-org/gitaly!2219
-rw-r--r--internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go13
-rw-r--r--internal/praefect/nodes/local_elector.go35
-rw-r--r--internal/praefect/nodes/manager.go9
-rw-r--r--internal/praefect/nodes/manager_test.go147
-rw-r--r--internal/praefect/nodes/sql_elector.go60
-rw-r--r--internal/praefect/nodes/sql_elector_test.go74
6 files changed, 245 insertions, 93 deletions
diff --git a/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go b/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go
new file mode 100644
index 000000000..1995e7e10
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20200602154246_remember_previous_writable_primary.go
@@ -0,0 +1,13 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20200602154246_remember_previous_writable_primary",
+ Up: []string{"ALTER TABLE shard_primaries ADD COLUMN previous_writable_primary TEXT"},
+ Down: []string{"ALTER TABLE shard_primaries DROP COLUMN previous_writable_primary"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/nodes/local_elector.go b/internal/praefect/nodes/local_elector.go
index 04711f760..e03e6ed49 100644
--- a/internal/praefect/nodes/local_elector.go
+++ b/internal/praefect/nodes/local_elector.go
@@ -20,14 +20,15 @@ type nodeCandidate struct {
// shard. It does NOT support multiple Praefect nodes or have any
// persistence. This is used mostly for testing and development.
type localElector struct {
- m sync.RWMutex
- failoverEnabled bool
- shardName string
- nodes []*nodeCandidate
- primaryNode *nodeCandidate
- readOnlyAfterFailover bool
- isReadOnly bool
- log logrus.FieldLogger
+ m sync.RWMutex
+ failoverEnabled bool
+ shardName string
+ nodes []*nodeCandidate
+ primaryNode *nodeCandidate
+ readOnlyAfterFailover bool
+ previousWritablePrimary Node
+ isReadOnly bool
+ log logrus.FieldLogger
}
// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
@@ -148,7 +149,17 @@ func (s *localElector) checkNodes(ctx context.Context) error {
return ErrPrimaryNotHealthy
}
+ var previousWritablePrimary Node
+ if s.primaryNode != nil {
+ previousWritablePrimary = s.primaryNode.node
+ }
+
+ if s.isReadOnly {
+ previousWritablePrimary = s.previousWritablePrimary
+ }
+
s.primaryNode = newPrimary
+ s.previousWritablePrimary = previousWritablePrimary
s.isReadOnly = s.readOnlyAfterFailover
return nil
@@ -161,6 +172,7 @@ func (s *localElector) GetShard() (Shard, error) {
s.m.RLock()
primary := s.primaryNode
isReadOnly := s.isReadOnly
+ previousWritablePrimary := s.previousWritablePrimary
s.m.RUnlock()
if primary == nil {
@@ -179,9 +191,10 @@ func (s *localElector) GetShard() (Shard, error) {
}
return Shard{
- IsReadOnly: isReadOnly,
- Primary: primary.node,
- Secondaries: secondaries,
+ PreviousWritablePrimary: previousWritablePrimary,
+ IsReadOnly: isReadOnly,
+ Primary: primary.node,
+ Secondaries: secondaries,
}, nil
}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 4b57a8868..27fd623b5 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -28,9 +28,12 @@ import (
// Shard is a primary with a set of secondaries
type Shard struct {
- IsReadOnly bool
- Primary Node
- Secondaries []Node
+ // PreviousWritablePrimary is the virtual storage's previous
+ // write-enabled primary.
+ PreviousWritablePrimary Node
+ IsReadOnly bool
+ Primary Node
+ Secondaries []Node
}
func (s Shard) GetNode(storage string) (Node, error) {
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index f3d743203..baead7cba 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -18,6 +18,45 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)
+type nodeAssertion struct {
+ Storage string
+ Address string
+}
+
+type shardAssertion struct {
+ PreviousWritablePrimary *nodeAssertion
+ IsReadOnly bool
+ Primary *nodeAssertion
+ Secondaries []nodeAssertion
+}
+
+func toNodeAssertion(n Node) *nodeAssertion {
+ if n == nil {
+ return nil
+ }
+
+ return &nodeAssertion{
+ Storage: n.GetStorage(),
+ Address: n.GetAddress(),
+ }
+}
+
+func assertShard(t *testing.T, exp shardAssertion, act Shard) {
+ t.Helper()
+
+ actSecondaries := make([]nodeAssertion, 0, len(act.Secondaries))
+ for _, n := range act.Secondaries {
+ actSecondaries = append(actSecondaries, *toNodeAssertion(n))
+ }
+
+ require.Equal(t, exp, shardAssertion{
+ PreviousWritablePrimary: toNodeAssertion(act.PreviousWritablePrimary),
+ IsReadOnly: act.IsReadOnly,
+ Primary: toNodeAssertion(act.Primary),
+ Secondaries: actSecondaries,
+ })
+}
+
func TestNodeStatus(t *testing.T) {
socket := testhelper.GetTemporaryGitalySocketFileName()
svr, healthSvr := testhelper.NewServerWithHealth(t, socket)
@@ -202,20 +241,21 @@ func TestNodeManager(t *testing.T) {
srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1)
defer srv1.Stop()
+ node1 := &config.Node{
+ Storage: "praefect-internal-0",
+ Address: "unix://" + internalSocket0,
+ DefaultPrimary: true,
+ }
+
+ node2 := &config.Node{
+ Storage: "praefect-internal-1",
+ Address: "unix://" + internalSocket1,
+ }
+
virtualStorages := []*config.VirtualStorage{
{
- Name: "virtual-storage-0",
- Nodes: []*config.Node{
- {
- Storage: "praefect-internal-0",
- Address: "unix://" + internalSocket0,
- DefaultPrimary: true,
- },
- {
- Storage: "praefect-internal-1",
- Address: "unix://" + internalSocket1,
- },
- },
+ Name: "virtual-storage-0",
+ Nodes: []*config.Node{node1, node2},
},
}
@@ -238,9 +278,6 @@ func TestNodeManager(t *testing.T) {
nm.Start(1*time.Millisecond, 5*time.Second)
nmWithoutFailover.Start(1*time.Millisecond, 5*time.Second)
- _, err = nm.GetShard("virtual-storage-0")
- require.NoError(t, err)
-
shardWithoutFailover, err := nmWithoutFailover.GetShard("virtual-storage-0")
require.NoError(t, err)
@@ -248,22 +285,23 @@ func TestNodeManager(t *testing.T) {
require.NoError(t, err)
// shard without failover and shard with failover should be the same
- require.Equal(t, shardWithoutFailover.Primary.GetStorage(), shard.Primary.GetStorage())
- require.Equal(t, shardWithoutFailover.Primary.GetAddress(), shard.Primary.GetAddress())
- require.Len(t, shard.Secondaries, 1)
- require.Equal(t, shardWithoutFailover.Secondaries[0].GetStorage(), shard.Secondaries[0].GetStorage())
- require.Equal(t, shardWithoutFailover.Secondaries[0].GetAddress(), shard.Secondaries[0].GetAddress())
- require.False(t, shard.IsReadOnly)
- require.False(t, shardWithoutFailover.IsReadOnly)
-
- require.Equal(t, virtualStorages[0].Nodes[0].Storage, shard.Primary.GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[0].Address, shard.Primary.GetAddress())
- require.Len(t, shard.Secondaries, 1)
- require.Equal(t, virtualStorages[0].Nodes[1].Storage, shard.Secondaries[0].GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[1].Address, shard.Secondaries[0].GetAddress())
+ initialState := shardAssertion{
+ Primary: &nodeAssertion{node1.Storage, node1.Address},
+ Secondaries: []nodeAssertion{{node2.Storage, node2.Address}},
+ }
+ assertShard(t, initialState, shard)
+ assertShard(t, initialState, shardWithoutFailover)
+
+ const unhealthyCheckCount = 1
+ const healthyCheckCount = healthcheckThreshold
+ checkShards := func(count int) {
+ for i := 0; i < count; i++ {
+ nm.checkShards()
+ }
+ }
- healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
- nm.checkShards()
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ checkShards(unhealthyCheckCount)
labelsCalled := mockHistogram.LabelsCalled()
for _, node := range virtualStorages[0].Nodes {
@@ -286,29 +324,48 @@ func TestNodeManager(t *testing.T) {
require.NotEqual(t, shardWithoutFailover.Secondaries[0].GetAddress(), shard.Secondaries[0].GetAddress())
// shard without failover should still match the config
- require.Equal(t, virtualStorages[0].Nodes[0].Storage, shardWithoutFailover.Primary.GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[0].Address, shardWithoutFailover.Primary.GetAddress())
- require.Len(t, shard.Secondaries, 1)
- require.Equal(t, virtualStorages[0].Nodes[1].Storage, shardWithoutFailover.Secondaries[0].GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[1].Address, shardWithoutFailover.Secondaries[0].GetAddress())
- require.False(t, shardWithoutFailover.IsReadOnly,
- "shard should not be read-only after primary failure with failover disabled")
+ assertShard(t, initialState, shardWithoutFailover)
// shard with failover should have promoted a secondary to primary and demoted the primary to a secondary
- require.Equal(t, virtualStorages[0].Nodes[1].Storage, shard.Primary.GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[1].Address, shard.Primary.GetAddress())
- require.Len(t, shard.Secondaries, 1)
- require.Equal(t, virtualStorages[0].Nodes[0].Storage, shard.Secondaries[0].GetStorage())
- require.Equal(t, virtualStorages[0].Nodes[0].Address, shard.Secondaries[0].GetAddress())
- require.True(t, shard.IsReadOnly, "shard should be read-only after a failover")
+ assertShard(t, shardAssertion{
+ // previous write enabled primary should have been recorded
+ PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address},
+ IsReadOnly: true,
+ Primary: &nodeAssertion{node2.Storage, node2.Address},
+ Secondaries: []nodeAssertion{{node1.Storage, node1.Address}},
+ }, shard)
+
+ // failing back to the original primary
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ checkShards(healthyCheckCount)
+
+ shard, err = nm.GetShard("virtual-storage-0")
+ require.NoError(t, err)
+
+ assertShard(t, shardAssertion{
+ // previous rw primary is now the primary again, field remains the same
+ // as node2 was not write enabled
+ PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address},
+ IsReadOnly: true,
+ Primary: &nodeAssertion{node1.Storage, node1.Address},
+ Secondaries: []nodeAssertion{{node2.Storage, node2.Address}},
+ }, shard)
require.NoError(t, nm.EnableWrites(context.Background(), "virtual-storage-0"))
shard, err = nm.GetShard("virtual-storage-0")
require.NoError(t, err)
- require.False(t, shard.IsReadOnly, "shard should be write enabled")
+ assertShard(t, shardAssertion{
+ PreviousWritablePrimary: &nodeAssertion{node1.Storage, node1.Address},
+ IsReadOnly: false,
+ Primary: &nodeAssertion{node1.Storage, node1.Address},
+ Secondaries: []nodeAssertion{{node2.Storage, node2.Address}},
+ }, shard)
+
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
- nm.checkShards()
+ checkShards(unhealthyCheckCount)
_, err = nm.GetShard("virtual-storage-0")
require.Error(t, err, "should return error since no nodes are healthy")
diff --git a/internal/praefect/nodes/sql_elector.go b/internal/praefect/nodes/sql_elector.go
index 07b933e1f..07505e384 100644
--- a/internal/praefect/nodes/sql_elector.go
+++ b/internal/praefect/nodes/sql_elector.go
@@ -186,7 +186,7 @@ func (s *sqlElector) checkNodes(ctx context.Context) error {
// The attempt to elect a primary may have conflicted with another
// node attempting to elect a primary. We check the database again
// to see the current state.
- candidate, _, err := s.lookupPrimary()
+ candidate, _, _, err := s.lookupPrimary()
if err != nil {
s.log.WithError(err).Error("error looking up primary")
return err
@@ -252,7 +252,7 @@ last_contact_attempt_at = NOW()`
// GetShard gets the current status of the shard. ErrPrimaryNotHealthy
// is returned if a primary does not exist.
func (s *sqlElector) GetShard() (Shard, error) {
- primary, readOnly, err := s.lookupPrimary()
+ primary, readOnly, prevWritablePrimary, err := s.lookupPrimary()
if err != nil {
return Shard{}, err
}
@@ -270,9 +270,10 @@ func (s *sqlElector) GetShard() (Shard, error) {
}
return Shard{
- IsReadOnly: readOnly,
- Primary: primary,
- Secondaries: secondaries,
+ PreviousWritablePrimary: prevWritablePrimary,
+ IsReadOnly: readOnly,
+ Primary: primary,
+ Secondaries: secondaries,
}, nil
}
@@ -414,6 +415,22 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error {
// do not yet have a row in the table, from starting in read-only mode. In a failover scenario,
// a row already exists in the table denoting the previous primary, and thus the shard should
// be switched to read-only mode.
+ //
+ // Previous write-enabled primary is stored in `previous_writable_primary` column. We store it to determine
+ // unreplicated writes from the previous write-enabled primary to the current primary to report and
+ // automatically repair data loss cases. Read-only primaries are not stored, as they can't receive
+ // writes that could fail to be replicated to other nodes. Consider the failover scenarios:
+ // N1 (RW) -> N2 (RO) -> N1 (RO): `previous_writable_primary` remains N1 as N1 was the only write-enabled primary
+ // and thus has all the possible writes
+ // N1 (RW) -> N2 (RW) -> N1 (RO): `previous_writable_primary` is N2 as we only store the previous write-enabled
+ // primary. If writes are enable on shard with data loss, the data loss
+ // is considered acknowledged.
+ // N1 (RO) -> N2 (RW) : `previous_writable_primary` is null as there could not have been unreplicated
+ // writes from the read-only primary N1
+ // N1 (RW) -> N2 (RW) : `previous_writable_primary` is N1 as it might have had unreplicated writes when
+ // N2 got elected
+ // N1 (RW) -> N2 (RO) -> N3 (RW): `previous_writable_primary` is N1 as N2 was not write-enabled before the second
+ // failover and thus any data missing from N3 must be on N1.
q = `INSERT INTO shard_primaries (elected_by_praefect, shard_name, node_name, elected_at)
SELECT $1::VARCHAR, $2::VARCHAR, $3::VARCHAR, NOW()
WHERE $3 != COALESCE((SELECT node_name FROM shard_primaries WHERE shard_name = $2::VARCHAR AND demoted = false), '')
@@ -423,10 +440,13 @@ func (s *sqlElector) electNewPrimary(candidates []*sqlCandidate) error {
, elected_at = EXCLUDED.elected_at
, read_only = $5
, demoted = false
+ , previous_writable_primary = CASE WHEN shard_primaries.read_only
+ THEN shard_primaries.previous_writable_primary
+ ELSE shard_primaries.node_name
+ END
WHERE shard_primaries.elected_at < now() - INTERVAL '1 MICROSECOND' * $4
`
_, err = s.db.Exec(q, s.praefectName, s.shardName, newPrimaryStorage, failoverTimeout.Microseconds(), s.readOnlyAfterFailover)
-
if err != nil {
s.log.Errorf("error updating new primary: %s", err)
return err
@@ -491,7 +511,7 @@ func (s *sqlElector) validateAndUpdatePrimary() error {
}
// Check if primary is in this list
- primaryNode, _, err := s.lookupPrimary()
+ primaryNode, _, _, err := s.lookupPrimary()
if err != nil {
s.log.WithError(err).Error("error looking up primary")
@@ -509,17 +529,20 @@ func (s *sqlElector) validateAndUpdatePrimary() error {
return nil
}
-func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, error) {
- var primaryName string
- var readOnly bool
+func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, Node, error) {
+ var (
+ primaryName string
+ readOnly bool
+ prevWritablePrimaryName sql.NullString
+ )
- const q = `SELECT node_name, read_only FROM shard_primaries WHERE shard_name = $1 AND demoted = false`
- if err := s.db.QueryRow(q, s.shardName).Scan(&primaryName, &readOnly); err != nil {
+ const q = `SELECT node_name, read_only, previous_writable_primary FROM shard_primaries WHERE shard_name = $1 AND demoted = false`
+ if err := s.db.QueryRow(q, s.shardName).Scan(&primaryName, &readOnly, &prevWritablePrimaryName); err != nil {
if err == sql.ErrNoRows {
- return nil, false, nil
+ return nil, false, nil, nil
}
- return nil, false, fmt.Errorf("error looking up primary: %w", err)
+ return nil, false, nil, fmt.Errorf("error looking up primary: %w", err)
}
var primaryNode *sqlCandidate
@@ -527,5 +550,12 @@ func (s *sqlElector) lookupPrimary() (*sqlCandidate, bool, error) {
primaryNode = s.lookupNodeByName(primaryName)
}
- return primaryNode, readOnly, nil
+ var prevWritablePrimary Node
+ if prevWritablePrimaryName.Valid {
+ if cand := s.lookupNodeByName(prevWritablePrimaryName.String); cand != nil {
+ prevWritablePrimary = cand.Node
+ }
+ }
+
+ return primaryNode, readOnly, prevWritablePrimary, nil
}
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index f6a459334..95c37a81d 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -82,23 +82,25 @@ func TestBasicFailover(t *testing.T) {
srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1)
defer srv1.Stop()
+ addr0 := "unix://" + internalSocket0
cc0, err := grpc.Dial(
- "unix://"+internalSocket0,
+ addr0,
grpc.WithInsecure(),
)
require.NoError(t, err)
+ addr1 := "unix://" + internalSocket1
cc1, err := grpc.Dial(
- "unix://"+internalSocket1,
+ addr1,
grpc.WithInsecure(),
)
require.NoError(t, err)
storageName := "default"
- mockHistogramVec0, mockHistogramVec1 := promtest.NewMockHistogramVec(), promtest.NewMockHistogramVec()
- cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0"}, cc0, testhelper.DiscardTestEntry(t), mockHistogramVec0)
- cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1"}, cc1, testhelper.DiscardTestEntry(t), mockHistogramVec1)
+
+ cs0 := newConnectionStatus(config.Node{Storage: storageName + "-0", Address: addr0}, cc0, logger, promtest.NewMockHistogramVec())
+ cs1 := newConnectionStatus(config.Node{Storage: storageName + "-1", Address: addr1}, cc1, logger, promtest.NewMockHistogramVec())
ns := []*nodeStatus{cs0, cs1}
elector := newSQLElector(shardName, conf, db.DB, logger, ns)
@@ -114,10 +116,11 @@ func TestBasicFailover(t *testing.T) {
require.Equal(t, cs0, elector.primaryNode.Node)
shard, err := elector.GetShard()
require.NoError(t, err)
- require.Equal(t, cs0.GetStorage(), shard.Primary.GetStorage())
- require.Equal(t, 1, len(shard.Secondaries))
- require.Equal(t, cs1.GetStorage(), shard.Secondaries[0].GetStorage())
- require.False(t, shard.IsReadOnly, "new shard should not be read-only")
+ assertShard(t, shardAssertion{
+ IsReadOnly: false,
+ Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}},
+ }, shard)
// Bring first node down
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
@@ -128,7 +131,11 @@ func TestBasicFailover(t *testing.T) {
require.NoError(t, err)
shard, err = elector.GetShard()
require.NoError(t, err)
- require.False(t, shard.IsReadOnly)
+ assertShard(t, shardAssertion{
+ IsReadOnly: false,
+ Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}},
+ }, shard)
// Predate the timeout to exceed it
predateLastSeenActiveAt(t, db, shardName, cs0.GetStorage(), failoverTimeout)
@@ -141,17 +148,46 @@ func TestBasicFailover(t *testing.T) {
db.RequireRowsInTable(t, "shard_primaries", 1)
shard, err = elector.GetShard()
require.NoError(t, err)
- require.Equal(t, cs1.GetStorage(), shard.Primary.GetStorage())
- require.True(t, shard.IsReadOnly, "shard should be read-only after a failover")
+ assertShard(t, shardAssertion{
+ // previous primary was write-enabled, so we should record it
+ PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ IsReadOnly: true,
+ Primary: &nodeAssertion{cs1.GetStorage(), cs1.GetAddress()},
+ Secondaries: []nodeAssertion{{cs0.GetStorage(), cs0.GetAddress()}},
+ }, shard)
+
+ // Failover back to the original node
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+ healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ predateElection(t, db, shardName, failoverTimeout)
+ predateLastSeenActiveAt(t, db, shardName, cs1.GetStorage(), failoverTimeout)
+ require.NoError(t, elector.checkNodes(ctx))
+
+ shard, err = elector.GetShard()
+ require.NoError(t, err)
+ assertShard(t, shardAssertion{
+ // failing back to the original node means the primary is the same as the
+ // previous write-enabled primary. Node 2 was read-only, so field is not
+ // modified
+ PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ IsReadOnly: true,
+ Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}},
+ }, shard)
// We should be able to enable writes on the new primary
require.NoError(t, elector.enableWrites(ctx))
shard, err = elector.GetShard()
require.NoError(t, err)
- require.False(t, shard.IsReadOnly, "")
+ assertShard(t, shardAssertion{
+ PreviousWritablePrimary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ IsReadOnly: false,
+ Primary: &nodeAssertion{cs0.GetStorage(), cs0.GetAddress()},
+ Secondaries: []nodeAssertion{{cs1.GetStorage(), cs1.GetAddress()}},
+ }, shard)
// Bring second node down
- healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
err = elector.checkNodes(ctx)
require.NoError(t, err)
@@ -178,13 +214,13 @@ func TestElectDemotedPrimary(t *testing.T) {
candidates := []*sqlCandidate{{Node: &nodeStatus{Node: node}}}
require.NoError(t, elector.electNewPrimary(candidates))
- primary, _, err := elector.lookupPrimary()
+ primary, _, _, err := elector.lookupPrimary()
require.NoError(t, err)
require.Equal(t, node.Storage, primary.GetStorage())
require.NoError(t, elector.demotePrimary())
- primary, _, err = elector.lookupPrimary()
+ primary, _, _, err = elector.lookupPrimary()
require.NoError(t, err)
require.Nil(t, primary)
@@ -192,7 +228,7 @@ func TestElectDemotedPrimary(t *testing.T) {
require.NoError(t, err)
require.NoError(t, elector.electNewPrimary(candidates))
- primary, _, err = elector.lookupPrimary()
+ primary, _, _, err = elector.lookupPrimary()
require.NoError(t, err)
require.Equal(t, node.Storage, primary.GetStorage())
}
@@ -344,7 +380,7 @@ func TestElectNewPrimary(t *testing.T) {
elector := newSQLElector(shardName, conf, db.DB, testhelper.DiscardTestLogger(t), ns)
require.NoError(t, elector.electNewPrimary(candidates))
- primary, readOnly, err := elector.lookupPrimary()
+ primary, readOnly, _, err := elector.lookupPrimary()
require.NoError(t, err)
require.Equal(t, "gitaly-1", primary.GetStorage(), "since replication queue is empty the first candidate should be chosen")
require.False(t, readOnly)
@@ -359,7 +395,7 @@ func TestElectNewPrimary(t *testing.T) {
elector.log = logger
require.NoError(t, elector.electNewPrimary(candidates))
- primary, readOnly, err = elector.lookupPrimary()
+ primary, readOnly, _, err = elector.lookupPrimary()
require.NoError(t, err)
require.Equal(t, testCase.expectedPrimary, primary.GetStorage())
require.True(t, readOnly)