diff options
author | James Fargher <proglottis@gmail.com> | 2020-11-13 04:32:24 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2020-11-13 04:32:24 +0300 |
commit | 3c96560b74aaaf2949fa23318b61cf87ab30fbb0 (patch) | |
tree | 50dac960c391a2a1a44daa458f60440d13529105 | |
parent | 19d7fc6389ae765edace547e9c74515622b8dbcd (diff) | |
parent | 0704ba472ea4b87008b5d7aaf7cf0356395b9866 (diff) |
Merge branch 'smh-reconciler-assignments' into 'master'
Don't target unassigned storages for reconciliation
See merge request gitlab-org/gitaly!2777
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler.go | 10 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler_benchmark_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler_test.go | 122 |
4 files changed, 102 insertions, 33 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 2c7c5a32b..606296773 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -386,7 +386,7 @@ func run(cfgs []starter.Config, conf config.Config) error { if conf.MemoryQueueEnabled { logger.Warn("Disabled automatic reconciliation as it is only implemented using SQL queue and in-memory queue is configured.") } else { - r := reconciler.NewReconciler(logger, db, nodeManager, conf.StorageNames(), conf.Reconciliation.HistogramBuckets) + r := reconciler.NewReconciler(logger, db, nodeManager, conf.StorageNames(), conf.Reconciliation.HistogramBuckets, false) prometheus.MustRegister(r) go r.Run(ctx, helper.NewTimerTicker(interval)) } diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 5290df5ae..ad962ac37 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -25,10 +25,14 @@ type Reconciler struct { // handleError is called with a possible error from reconcile. // If it returns an error, Run stops and returns with the error. handleError func(error) error + // assignmentsEnabled controls whether the reconciliation takes repository host node assignments + // in to consideration. If enabled, only assigned nodes are targeted by reconciliation jobs but any + // node can be used as a source. If disabled, all healthy nodes are considered assigned and targeted. + assignmentsEnabled bool } // NewReconciler returns a new Reconciler for repairing outdated repositories. -func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthChecker, storages map[string][]string, buckets []float64) *Reconciler { +func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthChecker, storages map[string][]string, buckets []float64, assignmentsEnabled bool) *Reconciler { log = log.WithField("component", "reconciler") r := &Reconciler{ @@ -49,6 +53,7 @@ func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthC log.WithError(err).Error("automatic reconciliation failed") return nil }, + assignmentsEnabled: assignmentsEnabled, } // create the timeseries prior to having observations @@ -163,6 +168,7 @@ WITH healthy_storages AS ( JOIN healthy_storages USING (virtual_storage) LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) WHERE COALESCE(storage_repositories.generation != repositories.generation, true) + AND (NOT $3 OR assigned) ORDER BY virtual_storage, relative_path ) AS unhealthy_repositories JOIN ( @@ -198,7 +204,7 @@ SELECT job->>'source_node_storage', job->>'target_node_storage' FROM reconciliation_jobs -`, pq.StringArray(virtualStorages), pq.StringArray(storages)) +`, pq.StringArray(virtualStorages), pq.StringArray(storages), r.assignmentsEnabled) if err != nil { return fmt.Errorf("query: %w", err) } diff --git a/internal/praefect/reconciler/reconciler_benchmark_test.go b/internal/praefect/reconciler/reconciler_benchmark_test.go index ec266c268..407591e96 100644 --- a/internal/praefect/reconciler/reconciler_benchmark_test.go +++ b/internal/praefect/reconciler/reconciler_benchmark_test.go @@ -68,6 +68,7 @@ CROSS JOIN (SELECT unnest('{gitaly-1, gitaly-2, gitaly-3}'::text[]) AS storage) praefect.StaticHealthChecker(storages), storages, prometheus.DefBuckets, + false, ) b.StartTimer() diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index cdfb395ff..8458a5fff 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -40,7 +40,13 @@ func getStorageMethod(storage string) func() string { func TestReconciler(t *testing.T) { // repositories describes storage state as // virtual storage -> relative path -> physical storage -> generation - type repositories map[string]map[string]map[string]int + + type storageRecord struct { + generation int + assigned bool + } + + type repositories map[string]map[string]map[string]storageRecord type existingJobs []datastore.ReplicationEvent type jobs []datastore.ReplicationJob type storages map[string][]string @@ -93,6 +99,7 @@ func TestReconciler(t *testing.T) { for _, tc := range []struct { desc string + assignmentsEnabled bool healthyStorages storages repositories repositories existingJobs existingJobs @@ -109,9 +116,9 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 0, - "storage-2": 0, - "storage-3": 0, + "storage-1": {generation: 0}, + "storage-2": {generation: 0}, + "storage-3": {generation: 0}, }, }, }, @@ -123,13 +130,13 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, "relative-path-2": { - "storage-1": 0, - "storage-2": 0, - "storage-3": 0, + "storage-1": {generation: 0}, + "storage-2": {generation: 0}, + "storage-3": {generation: 0}, }, }, }, @@ -155,11 +162,11 @@ func TestReconciler(t *testing.T) { desc: "reconciliation works with log batch size exceeded", healthyStorages: configuredStoragesWithout("storage-3"), repositories: func() repositories { - repos := repositories{"virtual-storage-1": make(map[string]map[string]int, 2*logBatchSize+1)} + repos := repositories{"virtual-storage-1": make(map[string]map[string]storageRecord, 2*logBatchSize+1)} for i := 0; i < 2*logBatchSize+1; i++ { - repos["virtual-storage-1"][fmt.Sprintf("relative-path-%d", i)] = map[string]int{ - "storage-1": 1, - "storage-2": 0, + repos["virtual-storage-1"][fmt.Sprintf("relative-path-%d", i)] = map[string]storageRecord{ + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, } } @@ -186,13 +193,13 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, "relative-path-2": { - "storage-1": 1, - "storage-2": 1, - "storage-3": 1, + "storage-1": {generation: 1}, + "storage-2": {generation: 1}, + "storage-3": {generation: 1}, }, }, }, @@ -204,8 +211,8 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, }, }, @@ -225,8 +232,8 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, }, }, @@ -246,8 +253,8 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, }, }, @@ -275,8 +282,8 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, }, }, @@ -308,8 +315,8 @@ func TestReconciler(t *testing.T) { repositories: repositories{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 1, - "storage-2": 1, + "storage-1": {generation: 1}, + "storage-2": {generation: 1}, }, }, }, @@ -343,6 +350,52 @@ func TestReconciler(t *testing.T) { TargetNodeStorage: "storage-3", }}, }, + { + desc: "unassigned node allowed to target an assigned node", + assignmentsEnabled: true, + healthyStorages: configuredStorages, + repositories: repositories{ + "virtual-storage-1": { + "relative-path-1": { + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, + "storage-3": {generation: 0, assigned: true}, + }, + }, + }, + reconciliationJobs: jobs{ + { + Change: datastore.UpdateRepo, + VirtualStorage: "virtual-storage-1", + RelativePath: "relative-path-1", + SourceNodeStorage: "storage-1", + TargetNodeStorage: "storage-3", + }, + }, + }, + { + desc: "assigned node allowed to target an assigned node", + assignmentsEnabled: true, + healthyStorages: configuredStorages, + repositories: repositories{ + "virtual-storage-1": { + "relative-path-1": { + "storage-1": {generation: 1, assigned: true}, + "storage-2": {generation: 0}, + "storage-3": {generation: 0, assigned: true}, + }, + }, + }, + reconciliationJobs: jobs{ + { + Change: datastore.UpdateRepo, + VirtualStorage: "virtual-storage-1", + RelativePath: "relative-path-1", + SourceNodeStorage: "storage-1", + TargetNodeStorage: "storage-3", + }, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -354,8 +407,16 @@ func TestReconciler(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, configuredStorages) for virtualStorage, relativePaths := range tc.repositories { for relativePath, storages := range relativePaths { - for storage, generation := range storages { - require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, generation)) + for storage, repo := range storages { + require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation)) + + _, err := db.ExecContext(ctx, ` + UPDATE storage_repositories SET assigned = $4 + WHERE virtual_storage = $1 + AND relative_path = $2 + AND storage = $3 + `, virtualStorage, relativePath, storage, repo.assigned) + require.NoError(t, err) } } } @@ -402,6 +463,7 @@ func TestReconciler(t *testing.T) { praefect.StaticHealthChecker(tc.healthyStorages), configuredStorages, prometheus.DefBuckets, + tc.assignmentsEnabled, ) reconciler.handleError = func(err error) error { return err } |