Welcome to mirror list, hosted at ThFree Co, Russian Federation.

repository_store.go « datastore « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 9c8dcf30e17a6bb8dd18f92904d1de8947467b6d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
package datastore

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"strings"

	"github.com/lib/pq"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
)

type storages map[string][]string

// GenerationUnknown is used to indicate lack of generation number in
// a replication job. Older instances can produce replication jobs
// without a generation number.
const GenerationUnknown = -1

var errWriteToOutdatedNodes = errors.New("write to outdated nodes")

// DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository
// that does not upgrade the target repository.
type DowngradeAttemptedError struct {
	VirtualStorage      string
	RelativePath        string
	Storage             string
	CurrentGeneration   int
	AttemptedGeneration int
}

func (err DowngradeAttemptedError) Error() string {
	return fmt.Sprintf("attempted downgrading %q -> %q -> %q from generation %d to %d",
		err.VirtualStorage, err.RelativePath, err.Storage, err.CurrentGeneration, err.AttemptedGeneration,
	)
}

// RepositoryNotExistsError is returned when trying to perform an operation on a non-existent repository.
type RepositoryNotExistsError struct {
	virtualStorage string
	relativePath   string
	storage        string
}

// Is checks whether the other errors is of the same type.
func (err RepositoryNotExistsError) Is(other error) bool {
	_, ok := other.(RepositoryNotExistsError)
	return ok
}

// Error returns the errors message.
func (err RepositoryNotExistsError) Error() string {
	return fmt.Sprintf("repository %q -> %q -> %q does not exist",
		err.virtualStorage, err.relativePath, err.storage,
	)
}

// RepositoryExistsError is returned when trying to create a repository that already exists.
type RepositoryExistsError struct {
	virtualStorage string
	relativePath   string
	storage        string
}

// Is checks whether the other errors is of the same type.
func (err RepositoryExistsError) Is(other error) bool {
	//nolint:errorlint
	_, ok := other.(RepositoryExistsError)
	return ok
}

// Error returns the errors message.
func (err RepositoryExistsError) Error() string {
	return fmt.Sprintf("repository %q -> %q -> %q already exists",
		err.virtualStorage, err.relativePath, err.storage,
	)
}

// ErrNoRowsAffected is returned when a query did not perform any changes.
var ErrNoRowsAffected = errors.New("no rows were affected by the query")

// RepositoryStore provides access to repository state.
type RepositoryStore interface {
	// GetGeneration gets the repository's generation on a given storage.
	GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
	// IncrementGeneration increments the generations of up to date nodes.
	IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error
	// SetGeneration sets the repository's generation on the given storage. If the generation is higher
	// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
	SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error
	// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
	// downgrade, a DowngradeAttemptedError is returned.
	GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
	// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
	// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
	// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
	// the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store.
	//
	// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
	// the repository's primary.
	//
	// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
	// secondaries are stored as the assigned hosts of the repository.
	CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
	// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
	SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
	// DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns
	// ErrNoRowsAffected when trying to delete a repository which has no record in the virtual storage
	// or the storages.
	DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error
	// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
	DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
	// RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well
	// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
	// which has no record in the virtual storage or the storage.
	RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
	ConsistentStoragesGetter
	// RepositoryExists returns whether the repository exists on a virtual storage.
	RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
	// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
	// are not able to serve requests at the moment.
	GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error)
	// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
	// record of the invalid repository. If the storage was the only storage with the repository, the repository's
	// record on the virtual storage is also deleted.
	DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
	// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
	// exists with the given virtual storage and relative path combination, an error is returned.
	ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
	// RepositoryNotFoundError if the repository doesn't exist.
	GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
}

// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
// Refer to the interface for method documentation.
type PostgresRepositoryStore struct {
	db glsql.Querier
	storages
}

// NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.
func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore {
	return &PostgresRepositoryStore{db: db, storages: storages(configuredStorages)}
}

func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) {
	const q = `
SELECT generation
FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
AND storage = $3
`

	var gen int
	if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath, storage).Scan(&gen); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return GenerationUnknown, nil
		}

		return 0, err
	}

	return gen, nil
}

func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error {
	const q = `
WITH updated_replicas AS (
	UPDATE storage_repositories
	SET generation = generation + 1
	FROM (
		SELECT virtual_storage, relative_path, storage
		FROM repositories
		JOIN storage_repositories USING (virtual_storage, relative_path, generation)
		WHERE virtual_storage = $1
		AND   relative_path   = $2
		AND   storage         = ANY($3)
		FOR UPDATE
	) AS to_update
	WHERE storage_repositories.virtual_storage = to_update.virtual_storage
	AND   storage_repositories.relative_path   = to_update.relative_path
	AND   storage_repositories.storage         = to_update.storage
	RETURNING storage_repositories.virtual_storage, storage_repositories.relative_path
),

updated_repository AS (
	UPDATE repositories
	SET generation = generation + 1
	FROM (
		SELECT DISTINCT virtual_storage, relative_path
		FROM updated_replicas
	) AS updated_repositories
	WHERE repositories.virtual_storage = updated_repositories.virtual_storage
	AND   repositories.relative_path   = updated_repositories.relative_path
)

SELECT
	EXISTS (
		SELECT FROM repositories
		WHERE virtual_storage = $1
		AND   relative_path   = $2
	) AS repository_exists,
	EXISTS ( SELECT FROM updated_replicas ) AS repository_updated
`
	var repositoryExists, repositoryUpdated bool
	if err := rs.db.QueryRowContext(
		ctx, q, virtualStorage, relativePath, pq.StringArray(append(secondaries, primary)),
	).Scan(&repositoryExists, &repositoryUpdated); err != nil {
		return fmt.Errorf("scan: %w", err)
	}

	if !repositoryExists {
		return commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
	}

	if !repositoryUpdated {
		return errWriteToOutdatedNodes
	}

	return nil
}

func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error {
	const q = `
WITH repository AS (
	UPDATE repositories SET generation = $4
	WHERE virtual_storage = $1
	AND   relative_path   = $2
	AND   COALESCE(repositories.generation, -1) < $4
)

INSERT INTO storage_repositories (
	repository_id,
	virtual_storage,
	relative_path,
	storage,
	generation
)
SELECT
	(SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2),
	$1, $2, $3, $4
ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET
	repository_id = EXCLUDED.repository_id,
	generation = EXCLUDED.generation
`

	_, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, generation)
	return err
}

// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error {
	result, err := rs.db.ExecContext(ctx, `
WITH updated_repository AS (
	UPDATE repositories
	SET generation = generation + 1
	WHERE virtual_storage = $1
	AND   relative_path   = $2
	RETURNING repository_id, virtual_storage, relative_path, generation
)

INSERT INTO storage_repositories (repository_id, virtual_storage, relative_path, storage, generation)
SELECT repository_id, virtual_storage, relative_path, $3, generation
FROM updated_repository
ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE
	SET repository_id = EXCLUDED.repository_id,
	    generation = EXCLUDED.generation
	`, virtualStorage, relativePath, storage)
	if err != nil {
		return fmt.Errorf("exec: %w", err)
	}

	if rowsAffected, err := result.RowsAffected(); err != nil {
		return fmt.Errorf("rows affected: %w", err)
	} else if rowsAffected == 0 {
		return commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
	}

	return nil
}

func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) {
	const q = `
SELECT storage, generation
FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
AND storage = ANY($3)
`

	rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath, pq.StringArray([]string{source, target}))
	if err != nil {
		return 0, err
	}
	defer rows.Close()

	sourceGeneration := GenerationUnknown
	targetGeneration := GenerationUnknown
	for rows.Next() {
		var storage string
		var generation int
		if err := rows.Scan(&storage, &generation); err != nil {
			return 0, err
		}

		switch storage {
		case source:
			sourceGeneration = generation
		case target:
			targetGeneration = generation
		default:
			return 0, fmt.Errorf("unexpected storage: %s", storage)
		}
	}

	if err := rows.Err(); err != nil {
		return 0, err
	}

	if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration {
		return 0, DowngradeAttemptedError{
			VirtualStorage:      virtualStorage,
			RelativePath:        relativePath,
			Storage:             target,
			CurrentGeneration:   targetGeneration,
			AttemptedGeneration: sourceGeneration,
		}
	}

	return sourceGeneration, nil
}

// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
// the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store.
//
// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
// the repository's primary.
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
	const q = `
WITH repo AS (
	INSERT INTO repositories (
		repository_id,
		virtual_storage,
		relative_path,
		replica_path,
		generation,
		"primary"
	) VALUES ($8, $1, $2, $2, 0, CASE WHEN $4 THEN $3 END)
),

assignments AS (
	INSERT INTO repository_assignments (
		repository_id,
		virtual_storage,
		relative_path,
		storage
	)
	SELECT $8, $1, $2, storage
	FROM (
		SELECT $3 AS storage
		UNION
		SELECT unnest($5::text[])
		UNION
		SELECT unnest($6::text[])
	) AS storages
	WHERE $7
)

INSERT INTO storage_repositories (
	repository_id,
	virtual_storage,
	relative_path,
	storage,
	generation
)
SELECT $8, $1, $2, storage, 0
FROM (
	SELECT $3 AS storage
	UNION
	SELECT unnest($5::text[])
) AS updated_storages
`

	_, err := rs.db.ExecContext(ctx, q,
		virtualStorage,
		relativePath,
		primary,
		storePrimary,
		pq.StringArray(updatedSecondaries),
		pq.StringArray(outdatedSecondaries),
		storeAssignments,
		repositoryID,
	)

	var pqerr *pq.Error
	if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" {
		if pqerr.Constraint == "repositories_pkey" {
			return fmt.Errorf("repository id %d already in use", repositoryID)
		}

		return RepositoryExistsError{
			virtualStorage: virtualStorage,
			relativePath:   relativePath,
			storage:        primary,
		}
	}

	return err
}

func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string, storages []string) error {
	return rs.delete(ctx, `
WITH repo AS (
	DELETE FROM repositories
	WHERE virtual_storage = $1
	AND relative_path = $2
)

DELETE FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
AND storage = ANY($3::text[])
		`, virtualStorage, relativePath, storages,
	)
}

// DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath string, storage string) error {
	return rs.delete(ctx, `
DELETE FROM storage_repositories
WHERE virtual_storage = $1
AND relative_path = $2
AND storage = ANY($3::text[])
		`, virtualStorage, relativePath, []string{storage},
	)
}

func (rs *PostgresRepositoryStore) delete(ctx context.Context, query, virtualStorage, relativePath string, storages []string) error {
	result, err := rs.db.ExecContext(ctx, query, virtualStorage, relativePath, pq.StringArray(storages))
	if err != nil {
		return err
	}

	if n, err := result.RowsAffected(); err != nil {
		return err
	} else if n == 0 {
		return ErrNoRowsAffected
	}

	return nil
}

func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error {
	const q = `
WITH repo AS (
	UPDATE repositories
	SET relative_path = $4,
	    replica_path  = $4
	WHERE virtual_storage = $1
	AND relative_path = $2
)

UPDATE storage_repositories
SET relative_path = $4
WHERE virtual_storage = $1
AND relative_path = $2
AND storage = $3
`

	result, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, newRelativePath)
	if err != nil {
		return err
	}

	if n, err := result.RowsAffected(); err != nil {
		return err
	} else if n == 0 {
		return RepositoryNotExistsError{
			virtualStorage: virtualStorage,
			relativePath:   relativePath,
			storage:        storage,
		}
	}

	return err
}

// GetConsistentStorages checks which storages are on the latest generation and returns them.
func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
	const q = `
SELECT storage
FROM repositories
JOIN storage_repositories USING (virtual_storage, relative_path)
WHERE virtual_storage = $1
AND relative_path = $2
AND storage_repositories.generation = (
	SELECT MAX(generation)
	FROM storage_repositories
	WHERE virtual_storage = $1
	AND relative_path = $2
)`

	rows, err := rs.db.QueryContext(ctx, q, virtualStorage, relativePath)
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()

	consistentStorages := map[string]struct{}{}
	for rows.Next() {
		var storage string
		if err := rows.Scan(&storage); err != nil {
			return nil, fmt.Errorf("scan: %w", err)
		}

		consistentStorages[storage] = struct{}{}
	}

	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("rows: %w", err)
	}

	if len(consistentStorages) == 0 {
		return nil, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
	}

	return consistentStorages, nil
}

func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) {
	const q = `
SELECT true
FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
`

	var exists bool
	if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath).Scan(&exists); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return false, nil
		}

		return false, err
	}

	return exists, nil
}

func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error {
	_, err := rs.db.ExecContext(ctx, `
WITH invalid_repository AS (
	DELETE FROM storage_repositories
	WHERE virtual_storage = $1
	AND   relative_path = $2
	AND   storage = $3
)

DELETE FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
AND NOT EXISTS (
	SELECT 1
	FROM storage_repositories
	WHERE virtual_storage = $1
	AND relative_path = $2
	AND storage != $3
)
	`, virtualStorage, relativePath, storage)
	return err
}

// StorageDetails represents a storage that contains or should contain a
// copy of the repository.
type StorageDetails struct {
	// Name of the storage as configured.
	Name string
	// BehindBy indicates how many generations the storage's copy of the repository is missing at maximum.
	BehindBy int
	// Assigned indicates whether the storage is an assigned host of the repository.
	Assigned bool
	// Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes.
	Healthy bool
	// ValidPrimary indicates whether the replica is ready to serve as the primary if necessary.
	ValidPrimary bool
}

// PartiallyAvailableRepository is a repository with one or more assigned replicas which are not
// able to serve requests at the moment.
type PartiallyAvailableRepository struct {
	// RelativePath is the relative path of the repository.
	RelativePath string
	// Primary is the current primary of this repository.
	Primary string
	// Storages contains information of the repository on each storage that contains the repository
	// or does not contain the repository but is assigned to host it.
	Storages []StorageDetails
}

// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error) {
	configuredStorages, ok := rs.storages[virtualStorage]
	if !ok {
		return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
	}

	// The query below gets the status of every repository which has one or more assigned replicas that
	// are not able to serve requests at the moment. The status includes how many changes a replica is behind,
	// whether the replica is assigned host or not, whether the replica is healthy and whether the replica is
	// considered a valid primary candidate. It works as follows:
	//
	// 1. First we get all the storages which contain the repository from `storage_repositories`. We
	//    list every copy of the repository as the latest generation could exist on an unassigned
	//    storage.
	//
	// 2. We join `repository_assignments` table with fallback behavior in case the repository has no
	//    assignments. A storage is considered assigned if:
	//
	//    1. If the repository has no assignments, every configured storage is considered assigned.
	//    2. If the repository has assignments, the storage needs to be assigned explicitly.
	//    3. Assignments of unconfigured storages are treated as if they don't exist.
	//
	//    If none of the assigned storages are outdated, the repository is not considered outdated as
	//    the desired replication factor has been reached.
	//
	// 3. We join `repositories` table to filter out any repositories that have been deleted but still
	//    exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
	//    and there can't be any assignments for deleted repositories, this is still needed as long as the
	//    fallback behavior of no assignments is in place.
	//
	// 4. We join the `healthy_storages` view to return the storages current health.
	//
	// 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case
	//    of a failover.
	//
	// 6. Finally we aggregate each repository's information in to a single row with a JSON object containing
	//    the information. This allows us to group the output already in the query and makes scanning easier
	//    We filter out groups which do not have an assigned storage as the replication factor on those
	//    is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
	//    than the assigned ones.
	//
	rows, err := rs.db.QueryContext(ctx, `
SELECT
	json_build_object (
		'RelativePath', relative_path,
		'Primary', "primary",
		'Storages', json_agg(
			json_build_object(
				'Name', storage,
				'BehindBy', behind_by,
				'Assigned', assigned,
				'Healthy', healthy,
				'ValidPrimary', valid_primary
			)
		)
	)
FROM (
	SELECT
		relative_path,
		repositories.primary,
		storage,
		repository_generations.generation - COALESCE(storage_repositories.generation, -1) AS behind_by,
		repository_assignments.storage IS NOT NULL AS assigned,
		healthy_storages.storage IS NOT NULL AS healthy,
		valid_primaries.storage IS NOT NULL AS valid_primary
	FROM storage_repositories
	FULL JOIN (
		SELECT virtual_storage, relative_path, storage
		FROM repositories
		CROSS JOIN (SELECT unnest($2::text[]) AS storage) AS configured_storages
		WHERE (
			SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
			FROM repository_assignments
			WHERE virtual_storage = repositories.virtual_storage
			AND   relative_path   = repositories.relative_path
			AND   storage         = ANY($2::text[])
		)
	) AS repository_assignments USING (virtual_storage, relative_path, storage)
	JOIN repositories USING (virtual_storage, relative_path)
	JOIN repository_generations USING (virtual_storage, relative_path)
	LEFT JOIN healthy_storages USING (virtual_storage, storage)
	LEFT JOIN valid_primaries USING (virtual_storage, relative_path, storage)
	WHERE virtual_storage = $1
	ORDER BY relative_path, "primary", storage
) AS outdated_repositories
GROUP BY relative_path, "primary"
HAVING bool_or(NOT valid_primary) FILTER(WHERE assigned)
ORDER BY relative_path, "primary"
	`, virtualStorage, pq.StringArray(configuredStorages))
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()

	var repos []PartiallyAvailableRepository
	for rows.Next() {
		var repositoryJSON string
		if err := rows.Scan(&repositoryJSON); err != nil {
			return nil, fmt.Errorf("scan: %w", err)
		}

		var repo PartiallyAvailableRepository
		if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil {
			return nil, fmt.Errorf("decode json: %w", err)
		}

		repos = append(repos, repo)
	}

	return repos, rows.Err()
}

// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
// exists with the given virtual storage and relative path combination, an error is returned.
func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
	var id int64
	if err := rs.db.QueryRowContext(ctx, `
SELECT nextval('repositories_repository_id_seq')
WHERE NOT EXISTS (
	SELECT FROM repositories
	WHERE virtual_storage = $1
	AND   relative_path   = $2
)
	`, virtualStorage, relativePath).Scan(&id); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, commonerr.ErrRepositoryAlreadyExists
		}

		return 0, fmt.Errorf("scan: %w", err)
	}

	return id, nil
}

// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
// RepositoryNotFoundError if the repository doesn't exist.
func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
	var id int64
	if err := rs.db.QueryRowContext(ctx, `
SELECT repository_id
FROM repositories
WHERE virtual_storage = $1
AND   relative_path   = $2
	`, virtualStorage, relativePath).Scan(&id); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
		}

		return 0, fmt.Errorf("scan: %w", err)
	}

	return id, nil
}