Welcome to mirror list, hosted at ThFree Co, Russian Federation.

reconciler.go « reconciler « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 3d731563a35f7529d071442b9f24e941c91080aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package reconciler

import (
	"context"
	"fmt"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v15/internal/helper"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/advisorylock"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)

const logBatchSize = 25

// Reconciler implements reconciliation logic for repairing outdated repository replicas.
type Reconciler struct {
	log                              logrus.FieldLogger
	db                               glsql.Querier
	hc                               praefect.HealthChecker
	storages                         map[string][]string
	reconciliationSchedulingDuration prometheus.Histogram
	// handleError is called with a possible error from reconcile.
	// If it returns an error, Run stops and returns with the error.
	handleError func(error) error
}

// NewReconciler returns a new Reconciler for repairing outdated repositories.
func NewReconciler(log logrus.FieldLogger, db glsql.Querier, hc praefect.HealthChecker, storages map[string][]string, buckets []float64) *Reconciler {
	log = log.WithField("component", "reconciler")

	r := &Reconciler{
		log:      log,
		db:       db,
		hc:       hc,
		storages: storages,
		reconciliationSchedulingDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
			Name:    "gitaly_praefect_reconciliation_scheduling_seconds",
			Help:    "The time spent performing a single reconciliation scheduling run.",
			Buckets: buckets,
		}),
		handleError: func(err error) error {
			log.WithError(err).Error("automatic reconciliation failed")
			return nil
		},
	}

	return r
}

//nolint:stylecheck // This is unintentionally missing documentation.
func (r *Reconciler) Describe(ch chan<- *prometheus.Desc) {
	prometheus.DescribeByCollect(r, ch)
}

//nolint:stylecheck // This is unintentionally missing documentation.
func (r *Reconciler) Collect(ch chan<- prometheus.Metric) {
	r.reconciliationSchedulingDuration.Collect(ch)
}

// Run reconciles on each tick the Ticker emits. Run returns
// when the context is canceled, returning the error from the context.
func (r *Reconciler) Run(ctx context.Context, ticker helper.Ticker) error {
	r.log.WithField("storages", r.storages).Info("automatic reconciler started")
	defer r.log.Info("automatic reconciler stopped")

	defer ticker.Stop()

	for {
		ticker.Reset()

		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C():
			if err := r.reconcile(ctx); err != nil {
				if err := r.handleError(err); err != nil {
					return err
				}
			}
		}
	}
}

// job is an internal type for formatting log messages
type job struct {
	RepositoryID   int64   `json:"repository_id"`
	Change         string  `json:"change"`
	CorrelationID  string  `json:"correlation_id"`
	VirtualStorage string  `json:"virtual_storage"`
	RelativePath   string  `json:"relative_path"`
	SourceStorage  *string `json:"source_storage,omitempty"`
	TargetStorage  string  `json:"target_storage"`
}

// reconcile schedules replication jobs to fix any discrepancies in how the expected state of the
// virtual storage is compared to the actual state.
//
// It currently handles fixing two discrepancies:
//
// 1. Assigned storage having an outdated replica of a repository. This is fixed by scheduling
//    an `update` type job from any healthy storage with an up to date replica. These are only
//    scheduled if there is no other active `update` type job targeting the outdated replica.
// 2. Unassigned storage having an unnecessary replica. This is fixed by scheduling a `delete_replica`
//    type job to remove the unneeded replica from the storage. These are only scheduled if all assigned
//    storages are up to date and the replica is not used as a source or target storage in any other job.
//    Only one job of this type is allowed to be queued for a given repository at a time. This is to avoid
//    deleting too many replicas if assignments are changed while the jobs are queued.
//
// The fixes are only scheduled if the target node is healthy, and if there is a healthy source node
// available should the job need one.
//
// If the repository has no assignments set, reconcile falls back to considering every storage in the
// virtual storage as assigned. As all storages are considered assigned if no assignments exist, no
// `delete_replica` jobs are scheduled when assignments are not explicitly set.
func (r *Reconciler) reconcile(ctx context.Context) error {
	defer prometheus.NewTimer(r.reconciliationSchedulingDuration).ObserveDuration()

	var virtualStorages []string
	var storages []string

	for virtualStorage, healthyStorages := range r.hc.HealthyNodes() {
		if len(healthyStorages) < 2 {
			// minimum two healthy storages within a virtual storage needed for valid
			// replication source and target
			r.log.WithField("virtual_storage", virtualStorage).Info("reconciliation skipped for virtual storage due to not having enough healthy storages")
			continue
		}

		for _, storage := range healthyStorages {
			virtualStorages = append(virtualStorages, virtualStorage)
			storages = append(storages, storage)
		}
	}

	if len(virtualStorages) == 0 {
		return nil
	}

	rows, err := r.db.QueryContext(ctx, `
WITH reconciliation_lock AS (
	SELECT pg_try_advisory_xact_lock($1) AS acquired
),

healthy_storages AS (
    SELECT unnest($2::text[]) AS virtual_storage,
           unnest($3::text[]) AS storage
),

delete_jobs AS (
	SELECT DISTINCT ON (repository_id)
		repository_id,
		virtual_storage,
		relative_path,
		storage
	FROM (
		SELECT repository_id, storage, generation
		FROM storage_repositories
	) AS storage_repositories
	JOIN repositories USING (repository_id)
	JOIN healthy_storages USING (virtual_storage, storage)
	WHERE (
		-- Only unassigned repositories should be targeted for deletion. If no assignment exists,
		-- every storage is considered assigned, thus no deletion would be scheduled.
		SELECT COUNT(storage) > 0 AND COUNT(storage) FILTER (WHERE storage = storage_repositories.storage) = 0
		FROM repository_assignments
		WHERE repository_id = repositories.repository_id
	)
	AND storage_repositories.generation <= (
		-- Check whether the replica's generation is equal or lower than the generation of every assigned
		-- replica of the repository. If so, then it is eligible for deletion.
		SELECT MIN(COALESCE(generation, -1))
		FROM repository_assignments
		FULL JOIN storage_repositories USING (repository_id, storage)
		WHERE repository_id = repositories.repository_id
	) AND NOT EXISTS (
		-- Ensure the replica is not used as target or source in any scheduled job. This is to avoid breaking
		-- any already scheduled jobs.
		SELECT FROM replication_queue
		WHERE (job->'repository_id')::bigint = repository_id
		AND (
				job->>'source_node_storage' = storage
			OR 	job->>'target_node_storage' = storage
		)
		AND state NOT IN ('completed', 'dead')
	) AND NOT EXISTS (
		-- Ensure there are no other scheduled 'delete_replica' type jobs for the repository. Performing rapid
		-- repository_assignments could cause the reconciler to schedule deletion against all replicas. To avoid this,
		-- we do not allow more than one 'delete_replica' job to be active at any given time.
		SELECT FROM replication_queue
		WHERE state NOT IN ('completed', 'dead')
		AND (job->>'repository_id')::bigint = repository_id
		AND job->>'change' = 'delete_replica'
	)
),

update_jobs AS (
	SELECT DISTINCT ON (repository_id, target_node_storage)
		repository_id,
		virtual_storage,
		relative_path,
		source_node_storage,
		target_node_storage
	FROM (
		SELECT repository_id, virtual_storage, relative_path, storage AS target_node_storage
		FROM repositories
		JOIN healthy_storages USING (virtual_storage)
		LEFT JOIN (
			SELECT repository_id, storage, generation
			FROM storage_repositories
		) AS storage_repositories USING (repository_id, storage)
		WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
		AND (
			-- If assignments exist for the repository, only the assigned storages are targeted for replication.
			-- If no assignments exist, every healthy node is targeted for replication.
			SELECT COUNT(storage) = 0 OR COUNT(storage) FILTER (WHERE storage = healthy_storages.storage) = 1
			FROM repository_assignments
			WHERE repository_id = repositories.repository_id
		)
		ORDER BY virtual_storage, relative_path
	) AS unhealthy_repositories
	JOIN (
		SELECT repository_id, storage AS source_node_storage
		FROM (
			SELECT repository_id, relative_path, storage, generation
			FROM storage_repositories
		) AS storage_repositories
		JOIN repositories USING (repository_id, relative_path, generation)
		JOIN healthy_storages USING (virtual_storage, storage)
		WHERE NOT EXISTS (
			SELECT FROM replication_queue
			WHERE state NOT IN ('completed', 'dead')
			AND (job->>'repository_id')::bigint = repository_id
			AND job->>'target_node_storage' = storage
			AND job->>'change' = 'delete_replica'
		)
		ORDER BY virtual_storage, relative_path
	) AS healthy_repositories USING (repository_id)
	WHERE NOT EXISTS (
		SELECT FROM replication_queue
		WHERE state NOT IN ('completed', 'dead')
		AND (job->>'repository_id')::bigint = repository_id
		AND job->>'target_node_storage' = target_node_storage
		AND job->>'change' = 'update'
	)
	ORDER BY repository_id, target_node_storage, random()
),

reconciliation_jobs AS (
	INSERT INTO replication_queue (lock_id, job, meta)
	SELECT
		(virtual_storage || '|' || target_node_storage || '|' || relative_path),
		to_jsonb(reconciliation_jobs),
		jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64'))
	FROM (
		SELECT
			COALESCE(repository_id, 0) AS repository_id,
			virtual_storage,
			relative_path,
			source_node_storage,
			target_node_storage,
			'update' AS change
		FROM update_jobs
			UNION ALL
		SELECT
			COALESCE(repository_id, 0) AS repository_id,
			virtual_storage,
			relative_path,
			NULL AS source_node_storage,
			storage AS target_node_storage,
			'delete_replica' AS change
		FROM delete_jobs
	) AS reconciliation_jobs
	-- only perform inserts if we managed to acquire the lock as otherwise
	-- we'd schedule duplicate jobs
	WHERE ( SELECT acquired FROM reconciliation_lock )
	RETURNING lock_id, meta, job
),

create_locks AS (
	INSERT INTO replication_queue_lock(id)
	SELECT lock_id
	FROM reconciliation_jobs
	ON CONFLICT (id) DO NOTHING
)

SELECT
	meta->>'correlation_id',
	job->>'repository_id',
	job->>'change',
	job->>'virtual_storage',
	job->>'relative_path',
	job->>'source_node_storage',
	job->>'target_node_storage'
FROM reconciliation_jobs
`, advisorylock.Reconcile, virtualStorages, storages)
	if err != nil {
		return fmt.Errorf("query: %w", err)
	}

	defer func() {
		if err := rows.Close(); err != nil {
			r.log.WithError(err).Error("error closing rows")
		}
	}()

	jobs := make([]job, 0, logBatchSize)

	for rows.Next() {
		var j job
		if err := rows.Scan(
			&j.CorrelationID,
			&j.RepositoryID,
			&j.Change,
			&j.VirtualStorage,
			&j.RelativePath,
			&j.SourceStorage,
			&j.TargetStorage,
		); err != nil {
			return fmt.Errorf("scan: %w", err)
		}

		jobs = append(jobs, j)
		if len(jobs) == logBatchSize {
			r.logJobs(jobs)
			jobs = jobs[:0]
		}
	}

	if err = rows.Err(); err != nil {
		return fmt.Errorf("rows.Err: %w", err)
	}

	if len(jobs) > 0 {
		r.logJobs(jobs)
	} else {
		r.log.Debug("reconciliation did not result in any scheduled jobs")
	}

	return nil
}

func (r *Reconciler) logJobs(jobs []job) {
	r.log.WithField("scheduled_jobs", jobs).Info("reconciliation jobs scheduled")
}