Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2020-11-13 04:32:24 +0300
committerJames Fargher <proglottis@gmail.com>2020-11-13 04:32:24 +0300
commit3c96560b74aaaf2949fa23318b61cf87ab30fbb0 (patch)
tree50dac960c391a2a1a44daa458f60440d13529105
parent19d7fc6389ae765edace547e9c74515622b8dbcd (diff)
parent0704ba472ea4b87008b5d7aaf7cf0356395b9866 (diff)
Merge branch 'smh-reconciler-assignments' into 'master'
Don't target unassigned storages for reconciliation See merge request gitlab-org/gitaly!2777
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/reconciler/reconciler.go10
-rw-r--r--internal/praefect/reconciler/reconciler_benchmark_test.go1
-rw-r--r--internal/praefect/reconciler/reconciler_test.go122
4 files changed, 102 insertions, 33 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 2c7c5a32b..606296773 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -386,7 +386,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
if conf.MemoryQueueEnabled {
logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.")
} else {
- r := reconciler.NewReconciler(logger, db, nodeManager, conf.StorageNames(), conf.Reconciliation.HistogramBuckets)
+ r := reconciler.NewReconciler(logger, db, nodeManager, conf.StorageNames(), conf.Reconciliation.HistogramBuckets, false)
prometheus.MustRegister(r)
go r.Run(ctx, helper.NewTimerTicker(interval))
}
diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go
index 5290df5ae..ad962ac37 100644
--- a/internal/praefect/reconciler/reconciler.go
+++ b/internal/praefect/reconciler/reconciler.go
@@ -25,10 +25,14 @@ type Reconciler struct {
// handleError is called with a possible error from reconcile.
// If it returns an error, Run stops and returns with the error.
handleError func(error) error
+ // assignmentsEnabled controls whether the reconciliation takes repository host node assignments
+ // in to consideration. If enabled, only assigned nodes are targeted by reconciliation jobs but any
+ // node can be used as a source. If disabled, all healthy nodes are considered assigned and targeted.
+ assignmentsEnabled bool
}
// NewReconciler returns a new Reconciler for repairing outdated repositories.
-func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthChecker, storages map[string][]string, buckets []float64) *Reconciler {
+func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthChecker, storages map[string][]string, buckets []float64, assignmentsEnabled bool) *Reconciler {
log = log.WithField("component", "reconciler")
r := &Reconciler{
@@ -49,6 +53,7 @@ func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthC
log.WithError(err).Error("automatic reconciliation failed")
return nil
},
+ assignmentsEnabled: assignmentsEnabled,
}
// create the timeseries prior to having observations
@@ -163,6 +168,7 @@ WITH healthy_storages AS (
JOIN healthy_storages USING (virtual_storage)
LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage)
WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
+ AND (NOT $3 OR assigned)
ORDER BY virtual_storage, relative_path
) AS unhealthy_repositories
JOIN (
@@ -198,7 +204,7 @@ SELECT
job->>'source_node_storage',
job->>'target_node_storage'
FROM reconciliation_jobs
-`, pq.StringArray(virtualStorages), pq.StringArray(storages))
+`, pq.StringArray(virtualStorages), pq.StringArray(storages), r.assignmentsEnabled)
if err != nil {
return fmt.Errorf("query: %w", err)
}
diff --git a/internal/praefect/reconciler/reconciler_benchmark_test.go b/internal/praefect/reconciler/reconciler_benchmark_test.go
index ec266c268..407591e96 100644
--- a/internal/praefect/reconciler/reconciler_benchmark_test.go
+++ b/internal/praefect/reconciler/reconciler_benchmark_test.go
@@ -68,6 +68,7 @@ CROSS JOIN (SELECT unnest('{gitaly-1, gitaly-2, gitaly-3}'::text[]) AS storage)
praefect.StaticHealthChecker(storages),
storages,
prometheus.DefBuckets,
+ false,
)
b.StartTimer()
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index cdfb395ff..8458a5fff 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -40,7 +40,13 @@ func getStorageMethod(storage string) func() string {
func TestReconciler(t *testing.T) {
// repositories describes storage state as
// virtual storage -> relative path -> physical storage -> generation
- type repositories map[string]map[string]map[string]int
+
+ type storageRecord struct {
+ generation int
+ assigned bool
+ }
+
+ type repositories map[string]map[string]map[string]storageRecord
type existingJobs []datastore.ReplicationEvent
type jobs []datastore.ReplicationJob
type storages map[string][]string
@@ -93,6 +99,7 @@ func TestReconciler(t *testing.T) {
for _, tc := range []struct {
desc string
+ assignmentsEnabled bool
healthyStorages storages
repositories repositories
existingJobs existingJobs
@@ -109,9 +116,9 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 0,
- "storage-2": 0,
- "storage-3": 0,
+ "storage-1": {generation: 0},
+ "storage-2": {generation: 0},
+ "storage-3": {generation: 0},
},
},
},
@@ -123,13 +130,13 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
"relative-path-2": {
- "storage-1": 0,
- "storage-2": 0,
- "storage-3": 0,
+ "storage-1": {generation: 0},
+ "storage-2": {generation: 0},
+ "storage-3": {generation: 0},
},
},
},
@@ -155,11 +162,11 @@ func TestReconciler(t *testing.T) {
desc: "reconciliation works with log batch size exceeded",
healthyStorages: configuredStoragesWithout("storage-3"),
repositories: func() repositories {
- repos := repositories{"virtual-storage-1": make(map[string]map[string]int, 2*logBatchSize+1)}
+ repos := repositories{"virtual-storage-1": make(map[string]map[string]storageRecord, 2*logBatchSize+1)}
for i := 0; i < 2*logBatchSize+1; i++ {
- repos["virtual-storage-1"][fmt.Sprintf("relative-path-%d", i)] = map[string]int{
- "storage-1": 1,
- "storage-2": 0,
+ repos["virtual-storage-1"][fmt.Sprintf("relative-path-%d", i)] = map[string]storageRecord{
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
}
}
@@ -186,13 +193,13 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
"relative-path-2": {
- "storage-1": 1,
- "storage-2": 1,
- "storage-3": 1,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 1},
+ "storage-3": {generation: 1},
},
},
},
@@ -204,8 +211,8 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
},
},
@@ -225,8 +232,8 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
},
},
@@ -246,8 +253,8 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
},
},
@@ -275,8 +282,8 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 0,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
},
},
},
@@ -308,8 +315,8 @@ func TestReconciler(t *testing.T) {
repositories: repositories{
"virtual-storage-1": {
"relative-path-1": {
- "storage-1": 1,
- "storage-2": 1,
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 1},
},
},
},
@@ -343,6 +350,52 @@ func TestReconciler(t *testing.T) {
TargetNodeStorage: "storage-3",
}},
},
+ {
+ desc: "unassigned node allowed to target an assigned node",
+ assignmentsEnabled: true,
+ healthyStorages: configuredStorages,
+ repositories: repositories{
+ "virtual-storage-1": {
+ "relative-path-1": {
+ "storage-1": {generation: 1},
+ "storage-2": {generation: 0},
+ "storage-3": {generation: 0, assigned: true},
+ },
+ },
+ },
+ reconciliationJobs: jobs{
+ {
+ Change: datastore.UpdateRepo,
+ VirtualStorage: "virtual-storage-1",
+ RelativePath: "relative-path-1",
+ SourceNodeStorage: "storage-1",
+ TargetNodeStorage: "storage-3",
+ },
+ },
+ },
+ {
+ desc: "assigned node allowed to target an assigned node",
+ assignmentsEnabled: true,
+ healthyStorages: configuredStorages,
+ repositories: repositories{
+ "virtual-storage-1": {
+ "relative-path-1": {
+ "storage-1": {generation: 1, assigned: true},
+ "storage-2": {generation: 0},
+ "storage-3": {generation: 0, assigned: true},
+ },
+ },
+ },
+ reconciliationJobs: jobs{
+ {
+ Change: datastore.UpdateRepo,
+ VirtualStorage: "virtual-storage-1",
+ RelativePath: "relative-path-1",
+ SourceNodeStorage: "storage-1",
+ TargetNodeStorage: "storage-3",
+ },
+ },
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -354,8 +407,16 @@ func TestReconciler(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, configuredStorages)
for virtualStorage, relativePaths := range tc.repositories {
for relativePath, storages := range relativePaths {
- for storage, generation := range storages {
- require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, generation))
+ for storage, repo := range storages {
+ require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation))
+
+ _, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories SET assigned = $4
+ WHERE virtual_storage = $1
+ AND relative_path = $2
+ AND storage = $3
+ `, virtualStorage, relativePath, storage, repo.assigned)
+ require.NoError(t, err)
}
}
}
@@ -402,6 +463,7 @@ func TestReconciler(t *testing.T) {
praefect.StaticHealthChecker(tc.healthyStorages),
configuredStorages,
prometheus.DefBuckets,
+ tc.assignmentsEnabled,
)
reconciler.handleError = func(err error) error { return err }