Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-25 18:21:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-05-25 18:21:22 +0300
commit6768fe2d8a6f413a537d03cdc0daf2ca5dc32dc3 (patch)
tree358f025467bbbf0785e67aba835fc99cccce418d
parentd0083f4c828772537e6891cae4fe0df1f6b255f4 (diff)
Query the health consensus directly in PerRepositoryElector
PerRepositoryElector currently gets the health consensus from the HealthManager. This is an unnecessary loop as the primary elector could query for the consensus directly from the database. This was originally implemented like this to ease testing and avoid duplicating the query logic. With the query logic extracted into a view, let's query the view directly.
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/nodes/per_repository.go31
-rw-r--r--internal/praefect/nodes/per_repository_test.go42
3 files changed, 39 insertions, 36 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 3f8ee04d1..0f014b76c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -321,7 +321,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
}()
healthChecker = hm
- elector := nodes.NewPerRepositoryElector(logger, db, hm)
+ elector := nodes.NewPerRepositoryElector(logger, db)
if conf.Failover.Enabled {
go func() {
diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go
index 62c4d5777..a5ea2c908 100644
--- a/internal/praefect/nodes/per_repository.go
+++ b/internal/praefect/nodes/per_repository.go
@@ -7,7 +7,6 @@ import (
"fmt"
"time"
- "github.com/lib/pq"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
@@ -22,26 +21,16 @@ var ErrNoPrimary = errors.New("no primary")
type PerRepositoryElector struct {
log logrus.FieldLogger
db glsql.Querier
- hc HealthConsensus
handleError func(error) error
retryWait time.Duration
}
-// HealthConsensus returns the cluster's consensus of healthy nodes.
-type HealthConsensus interface {
- // HealthConsensus returns a list of healthy nodes by cluster consensus. Returned
- // set may contains nodes not present in the local configuration if the cluster has
- // deemed them healthy.
- HealthConsensus() map[string][]string
-}
-
// NewPerRepositoryElector returns a new per repository primary elector.
-func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier, hc HealthConsensus) *PerRepositoryElector {
+func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier) *PerRepositoryElector {
log = log.WithField("component", "PerRepositoryElector")
return &PerRepositoryElector{
log: log,
db: db,
- hc: hc,
handleError: func(err error) error {
log.WithError(err).Error("failed performing failovers")
return nil
@@ -97,22 +86,8 @@ func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}
}
func (pr *PerRepositoryElector) performFailovers(ctx context.Context) error {
- healthyNodes := pr.hc.HealthConsensus()
-
- var virtualStorages, physicalStorages []string
- for virtualStorage, nodes := range healthyNodes {
- for _, node := range nodes {
- virtualStorages = append(virtualStorages, virtualStorage)
- physicalStorages = append(physicalStorages, node)
- }
- }
-
rows, err := pr.db.QueryContext(ctx, `
-WITH healthy_storages AS (
- SELECT unnest($1::text[]) AS virtual_storage, unnest($2::text[]) AS storage
-),
-
-updated AS (
+WITH updated AS (
UPDATE repositories
SET "primary" = (
SELECT storage
@@ -169,7 +144,7 @@ promoted AS (
SELECT virtual_storage, storage, COALESCE(demoted, 0), COALESCE(promoted, 0)
FROM demoted
FULL JOIN promoted USING (virtual_storage, storage)
-`, pq.StringArray(virtualStorages), pq.StringArray(physicalStorages))
+ `)
if err != nil {
return fmt.Errorf("query: %w", err)
}
diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go
index 067b132e4..634738226 100644
--- a/internal/praefect/nodes/per_repository_test.go
+++ b/internal/praefect/nodes/per_repository_test.go
@@ -9,6 +9,7 @@ import (
"testing"
"time"
+ "github.com/lib/pq"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
@@ -19,10 +20,38 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
-// HealthConsensusFunc is an adapter to turn a conforming function in to a HealthConsensus.
-type HealthConsensusFunc func() map[string][]string
+func setHealthyNodes(t testing.TB, ctx context.Context, db glsql.Querier, healthyNodes map[string]map[string][]string) {
+ var praefects, virtualStorages, storages []string
+ for praefect, virtualStors := range healthyNodes {
+ for virtualStorage, stors := range virtualStors {
+ for _, storage := range stors {
+ praefects = append(praefects, praefect)
+ virtualStorages = append(virtualStorages, virtualStorage)
+ storages = append(storages, storage)
+ }
+ }
+ }
-func (fn HealthConsensusFunc) HealthConsensus() map[string][]string { return fn() }
+ _, err := db.ExecContext(ctx, `
+WITH clear_previous_checks AS ( DELETE FROM node_status )
+
+INSERT INTO node_status (praefect_name, shard_name, node_name, last_contact_attempt_at, last_seen_active_at)
+SELECT
+ unnest($1::text[]) AS praefect_name,
+ unnest($2::text[]) AS shard_name,
+ unnest($3::text[]) AS node_name,
+ NOW() AS last_contact_attempt_at,
+ NOW() AS last_seen_active_at
+ON CONFLICT (praefect_name, shard_name, node_name) DO UPDATE SET
+ last_contact_attempt_at = NOW(),
+ last_seen_active_at = NOW()
+ `,
+ pq.StringArray(praefects),
+ pq.StringArray(virtualStorages),
+ pq.StringArray(storages),
+ )
+ require.NoError(t, err)
+}
func TestPerRepositoryElector(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -503,11 +532,11 @@ func TestPerRepositoryElector(t *testing.T) {
for _, step := range tc.steps {
runElection := func(tx *sql.Tx, matchLogs logMatcher) {
+ setHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect-0": step.healthyNodes})
+
// The first transaction runs first
logger, hook := test.NewNullLogger()
- elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx,
- HealthConsensusFunc(func() map[string][]string { return step.healthyNodes }),
- )
+ elector := NewPerRepositoryElector(logrus.NewEntry(logger), tx)
elector.handleError = func(err error) error { return err }
trigger := make(chan struct{}, 1)
@@ -568,7 +597,6 @@ func TestPerRepositoryElector_Retry(t *testing.T) {
return nil, assert.AnError
},
},
- HealthConsensusFunc(func() map[string][]string { return map[string][]string{} }),
)
elector.retryWait = time.Nanosecond
elector.handleError = func(err error) error {