Welcome to mirror list, hosted at ThFree Co, Russian Federation.

repository_store.go « datastore « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 77dce6ba064d644fdca5de4be3acea25e2cffd0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
package datastore

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"strings"
	"time"

	"gitlab.com/gitlab-org/gitaly/v16/internal/datastructure"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)

type storages map[string][]string

// GenerationUnknown is used to indicate lack of generation number in
// a replication job. Older instances can produce replication jobs
// without a generation number.
const GenerationUnknown = -1

var errWriteToOutdatedNodes = errors.New("write to outdated nodes")

// DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository
// that does not upgrade the target repository.
type DowngradeAttemptedError struct {
	Storage             string
	CurrentGeneration   int
	AttemptedGeneration int
}

func (err DowngradeAttemptedError) Error() string {
	return fmt.Sprintf("attempted downgrading storage %q from generation %d to %d",
		err.Storage, err.CurrentGeneration, err.AttemptedGeneration,
	)
}

var (
	// ErrNoRowsAffected is returned when a query did not perform any changes.
	ErrNoRowsAffected = errors.New("no rows were affected by the query")
	// ErrRepositoryAlreadyExists is returned when trying to insert a repository into the datastore that already
	// exists.
	ErrRepositoryAlreadyExists = errors.New("repository already exists")
	// ErrRepositoryNotFound is returned when looking up a repository that does not exist in the datastore.
	ErrRepositoryNotFound = errors.New("repository not found")
)

// RepositoryStore provides access to repository state.
type RepositoryStore interface {
	// GetGeneration gets the repository's generation on a given storage.
	GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
	// IncrementGeneration increments the generations of up to date nodes.
	IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
	// SetGeneration sets the repository's generation on the given storage. If the generation is higher
	// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
	SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
	// GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record
	// for the repository ID is not found.
	GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
	// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
	// downgrade, a DowngradeAttemptedError is returned.
	GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
	// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
	// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
	// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
	// the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the store.
	//
	// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
	// the repository's primary.
	//
	// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
	// secondaries are stored as the assigned hosts of the repository.
	CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
	// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
	SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
	// DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages
	// which are known to have a replica at the time of deletion. ErrRepositoryNotFound is returned when
	// the repository is not tracked by the Praefect datastore.
	DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
	// DeleteAllRepositories deletes the database records associated with
	// repositories in the specified virtual storage.
	DeleteAllRepositories(ctx context.Context, virtualStorage string) error
	// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
	DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
	// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
	GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
	ConsistentStoragesGetter
	// RepositoryExists returns whether the repository exists on a virtual storage.
	RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
	// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
	// are not able to serve requests at the moment.
	GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
	// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
	// record of the invalid repository. If the storage was the only storage with the repository, the repository's
	// record on the virtual storage is also deleted.
	DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
	// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
	// exists with the given virtual storage and relative path combination, an error is returned.
	ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
	// ErrRepositoryNotFound error if the repository doesn't exist.
	GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	// GetRepositoryMetadata retrieves a repository's metadata.
	GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
	// GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
	GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
	// MarkUnverified marks replicas of the repository unverified.
	MarkUnverified(ctx context.Context, repositoryID int64) (int64, error)
	// MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified.
	MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
	// MarkStorageUnverified marsk all replicas on the storage as unverified.
	MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
}

// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
// Refer to the interface for method documentation.
type PostgresRepositoryStore struct {
	db glsql.Querier
	storages
}

// NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.
func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore {
	return &PostgresRepositoryStore{db: db, storages: storages(configuredStorages)}
}

// MarkUnverified marks replicas of the repository unverified.
func (rs *PostgresRepositoryStore) MarkUnverified(ctx context.Context, repositoryID int64) (int64, error) {
	result, err := rs.db.ExecContext(ctx, `
		UPDATE storage_repositories
		SET verified_at = NULL
		WHERE repository_id = $1
		AND verified_at IS NOT NULL
	`, repositoryID)
	if err != nil {
		return 0, fmt.Errorf("query: %w", err)
	}

	return result.RowsAffected()
}

// MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified.
func (rs *PostgresRepositoryStore) MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) {
	result, err := rs.db.ExecContext(ctx, `
		UPDATE storage_repositories
		SET verified_at = NULL
		FROM repositories
		WHERE repositories.virtual_storage = $1
		AND   repositories.repository_id   = storage_repositories.repository_id
		AND   verified_at IS NOT NULL
	`, virtualStorage)
	if err != nil {
		return 0, fmt.Errorf("query: %w", err)
	}

	return result.RowsAffected()
}

// MarkStorageUnverified marsk all replicas on the storage as unverified.
func (rs *PostgresRepositoryStore) MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) {
	result, err := rs.db.ExecContext(ctx, `
		UPDATE storage_repositories
		SET verified_at = NULL
		FROM repositories
		WHERE repositories.repository_id = storage_repositories.repository_id
		AND   repositories.virtual_storage = $1
		AND   storage_repositories.storage = $2
		AND   verified_at IS NOT NULL
	`, virtualStorage, storage)
	if err != nil {
		return 0, fmt.Errorf("query: %w", err)
	}

	return result.RowsAffected()
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) {
	const q = `
SELECT generation
FROM storage_repositories
WHERE repository_id = $1
AND storage = $2
`

	var gen int
	if err := rs.db.QueryRowContext(ctx, q, repositoryID, storage).Scan(&gen); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return GenerationUnknown, nil
		}

		return 0, err
	}

	return gen, nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error {
	const q = `
WITH updated_replicas AS (
	UPDATE storage_repositories
	SET generation = generation + 1
	FROM (
		SELECT repository_id, storage
		FROM repositories
		JOIN storage_repositories USING (repository_id, generation)
		WHERE repository_id = $1
		AND   storage       = ANY($2)
		FOR UPDATE
	) AS to_update
	WHERE storage_repositories.repository_id = to_update.repository_id
	AND   storage_repositories.storage       = to_update.storage
	RETURNING storage_repositories.repository_id
),

updated_repository AS (
	UPDATE repositories
	SET generation = generation + 1
	FROM (
		SELECT DISTINCT repository_id
		FROM updated_replicas
	) AS updated_repositories
	WHERE repositories.repository_id = updated_repositories.repository_id
)

SELECT
	EXISTS (
		SELECT FROM repositories
		WHERE repository_id = $1
	) AS repository_exists,
	EXISTS ( SELECT FROM updated_replicas ) AS repository_updated
`
	var repositoryExists, repositoryUpdated bool
	if err := rs.db.QueryRowContext(
		ctx, q, repositoryID, append(secondaries, primary),
	).Scan(&repositoryExists, &repositoryUpdated); err != nil {
		return fmt.Errorf("scan: %w", err)
	}

	if !repositoryExists {
		return ErrRepositoryNotFound
	}

	if !repositoryUpdated {
		return errWriteToOutdatedNodes
	}

	return nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error {
	const q = `
WITH repository AS (
	UPDATE repositories SET generation = $3
	WHERE repository_id = $1
	AND   COALESCE(repositories.generation, -1) < $3
)

INSERT INTO storage_repositories (
	repository_id,
	virtual_storage,
	relative_path,
	storage,
	generation
)
SELECT
	repository_id,
	virtual_storage,
	$4,
	$2,
	$3
FROM repositories
WHERE repository_id = $1
ON CONFLICT (repository_id, storage) DO UPDATE SET
	relative_path = EXCLUDED.relative_path,
	generation = EXCLUDED.generation
`

	_, err := rs.db.ExecContext(ctx, q, repositoryID, storage, generation, relativePath)
	return err
}

// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storageName string) error {
	result, err := rs.db.ExecContext(ctx, `
WITH updated_repository AS (
	UPDATE repositories
	SET generation = generation + 1
	WHERE virtual_storage = $1
	AND   relative_path   = $2
	RETURNING repository_id, virtual_storage, relative_path, generation
)

INSERT INTO storage_repositories (repository_id, virtual_storage, relative_path, storage, generation)
SELECT repository_id, virtual_storage, relative_path, $3, generation
FROM updated_repository
ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE
	SET repository_id = EXCLUDED.repository_id,
	    generation = EXCLUDED.generation
	`, virtualStorage, relativePath, storageName)
	if err != nil {
		return fmt.Errorf("exec: %w", err)
	}

	if rowsAffected, err := result.RowsAffected(); err != nil {
		return fmt.Errorf("rows affected: %w", err)
	} else if rowsAffected == 0 {
		return ErrRepositoryNotFound
	}

	return nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) {
	const q = `
SELECT storage, generation
FROM storage_repositories
WHERE repository_id = $1
AND storage = ANY($2)
`

	rows, err := rs.db.QueryContext(ctx, q, repositoryID, []string{source, target})
	if err != nil {
		return 0, err
	}
	defer rows.Close()

	sourceGeneration := GenerationUnknown
	targetGeneration := GenerationUnknown
	for rows.Next() {
		var storage string
		var generation int
		if err := rows.Scan(&storage, &generation); err != nil {
			return 0, err
		}

		switch storage {
		case source:
			sourceGeneration = generation
		case target:
			targetGeneration = generation
		default:
			return 0, fmt.Errorf("unexpected storage: %s", storage)
		}
	}

	if err := rows.Err(); err != nil {
		return 0, err
	}

	if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration {
		return 0, DowngradeAttemptedError{
			Storage:             target,
			CurrentGeneration:   targetGeneration,
			AttemptedGeneration: sourceGeneration,
		}
	}

	return sourceGeneration, nil
}

// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
// the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the
// store.
//
// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
// the repository's primary.
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
	const q = `
WITH repo AS (
	INSERT INTO repositories (
		repository_id,
		virtual_storage,
		relative_path,
		replica_path,
		generation,
		"primary"
	) VALUES ($8, $1, $2, $9, 0, CASE WHEN $4 THEN $3 END)
),

assignments AS (
	INSERT INTO repository_assignments (
		repository_id,
		virtual_storage,
		relative_path,
		storage
	)
	SELECT $8, $1, $2, storage
	FROM (
		SELECT $3 AS storage
		UNION
		SELECT unnest($5::text[])
		UNION
		SELECT unnest($6::text[])
	) AS storages
	WHERE $7
)

INSERT INTO storage_repositories (
	repository_id,
	virtual_storage,
	relative_path,
	storage,
	generation
)
SELECT $8, $1, $2, storage, 0
FROM (
	SELECT $3 AS storage
	UNION
	SELECT unnest($5::text[])
) AS updated_storages
`

	_, err := rs.db.ExecContext(ctx, q,
		virtualStorage,
		relativePath,
		primary,
		storePrimary,
		updatedSecondaries,
		outdatedSecondaries,
		storeAssignments,
		repositoryID,
		replicaPath,
	)
	if err != nil {
		if glsql.IsUniqueViolation(err, "repositories_pkey") {
			return fmt.Errorf("repository id %d already in use", repositoryID)
		}

		if glsql.IsUniqueViolation(err, "storage_repositories_pkey") {
			return ErrRepositoryAlreadyExists
		}

		return err
	}

	return nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) {
	var (
		replicaPath string
		storages    glsql.StringArray
	)

	if err := rs.db.QueryRowContext(ctx, `
WITH repository AS (
	DELETE FROM repositories
	WHERE virtual_storage = $1
	AND relative_path = $2
	RETURNING repository_id, replica_path
)

SELECT replica_path, ARRAY_AGG(storage_repositories.storage)
FROM repository
LEFT JOIN storage_repositories USING (repository_id)
GROUP BY replica_path
		`, virtualStorage, relativePath,
	).Scan(&replicaPath, &storages); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return "", nil, ErrRepositoryNotFound
		}

		return "", nil, fmt.Errorf("scan: %w", err)
	}

	return replicaPath, storages.Slice(), nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error {
	_, err := rs.db.ExecContext(ctx, `
WITH delete_jobs AS (
  DELETE FROM replication_queue
  WHERE job->>'virtual_storage' = $1
  RETURNING id
),

delete_job_locks AS (
  DELETE FROM replication_queue_job_lock
  USING delete_jobs
  WHERE job_id = delete_jobs.id
),

delete_locks AS (
  DELETE FROM replication_queue_lock
  WHERE id LIKE $1 || '|%|%'
)

DELETE FROM repositories
WHERE virtual_storage = $1;
	`, virtualStorage)
	if err != nil {
		return err
	}

	return nil
}

// DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error {
	result, err := rs.db.ExecContext(ctx, `
DELETE FROM storage_repositories
WHERE repository_id = $1
AND storage = $2
	`, repositoryID, storage)
	if err != nil {
		return err
	}

	if n, err := result.RowsAffected(); err != nil {
		return err
	} else if n == 0 {
		return ErrNoRowsAffected
	}

	return nil
}

// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) {
	return rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, relative_path, generation)
WHERE repository_id = $1
GROUP BY replica_path
	`, repositoryID)
}

// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
	return rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, relative_path, generation)
WHERE repositories.virtual_storage = $1
AND repositories.relative_path = $2
GROUP BY replica_path
	`, virtualStorage, relativePath)
}

// getConsistentStorages is a helper for querying the consistent storages by different keys.
func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, *datastructure.Set[string], error) {
	var replicaPath string
	var storages glsql.StringArray

	if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return "", nil, ErrRepositoryNotFound
		}

		return "", nil, fmt.Errorf("query: %w", err)
	}

	return replicaPath, datastructure.SetFromSlice(storages.Slice()), nil
}

//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) {
	const q = `
SELECT true
FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
`

	var exists bool
	if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath).Scan(&exists); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return false, nil
		}

		return false, err
	}

	return exists, nil
}

// DeleteInvalidRepository deletes the given replica. If the replica was the only replica of the
// repository, then the repository will be deleted, as well.
func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error {
	_, err := rs.db.ExecContext(ctx, `
WITH repository AS (
	SELECT repository_id
	FROM repositories
	WHERE repository_id = $1
	FOR UPDATE
),

invalid_repository AS (
	DELETE FROM storage_repositories
	USING repository
	WHERE storage_repositories.repository_id = repository.repository_id
	AND storage = $2
)

DELETE FROM repositories
USING repository
WHERE repositories.repository_id = repository.repository_id
AND NOT EXISTS (
	SELECT 1
	FROM storage_repositories
	WHERE repository_id = $1
	AND storage != $2
)
	`, repositoryID, storage)
	return err
}

// Replica represents a replica of a repository.
type Replica struct {
	// Storage is the name of the replica's storage.
	Storage string
	// Generation is the replica's confirmed generation. If the replica does not yet exists, generation
	// is -1.
	Generation int64
	// Assigned indicates whether the storage is an assigned host of the repository.
	Assigned bool
	// Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes.
	Healthy bool
	// ValidPrimary indicates whether the replica is ready to serve as the primary if necessary.
	ValidPrimary bool
	// VerifiedAt is the last successful verification time of the replica.
	VerifiedAt time.Time
}

// RepositoryMetadata contains the repository's metadata.
type RepositoryMetadata struct {
	// RepositoryID is the internal id of the repository.
	RepositoryID int64
	// VirtualStorage is the virtual storage where the repository is.
	VirtualStorage string
	// RelativePath is the relative path of the repository.
	RelativePath string
	// ReplicaPath is the actual disk location where the replicas are stored in the storages.
	ReplicaPath string
	// Primary is the current primary of this repository.
	Primary string
	// Generation is the current generation of the repository.
	Generation int64
	// Replicas contains information of the repository on each storage that contains the repository
	// or does not contain the repository but is assigned to host it.
	Replicas []Replica
}

// GetRepositoryMetadata retrieves a repository's metadata.
func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) {
	metadata, err := rs.getRepositoryMetadata(
		ctx,
		"WHERE repository_id = $3",
		"WHERE repository_id = $3",
		"",
		repositoryID,
	)
	if err != nil {
		return RepositoryMetadata{}, err
	}

	if len(metadata) == 0 {
		return RepositoryMetadata{}, ErrRepositoryNotFound
	}

	return metadata[0], nil
}

// GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) {
	metadata, err := rs.getRepositoryMetadata(
		ctx,
		"WHERE virtual_storage = $3 AND relative_path = $4",
		"WHERE repository_id = (SELECT repository_id FROM repositories)",
		"",
		virtualStorage,
		relativePath,
	)
	if err != nil {
		return RepositoryMetadata{}, err
	}

	if len(metadata) == 0 {
		return RepositoryMetadata{}, ErrRepositoryNotFound
	}

	return metadata[0], nil
}

// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) {
	_, ok := rs.storages[virtualStorage]
	if !ok {
		return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
	}

	return rs.getRepositoryMetadata(ctx,
		"WHERE virtual_storage = $3",
		"WHERE virtual_storage = $3",
		"HAVING bool_or(NOT valid_primaries.storage IS NOT NULL) FILTER(WHERE assigned)",
		virtualStorage,
	)
}

// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
func (rs *PostgresRepositoryStore) getRepositoryMetadata(ctx context.Context, repositoriesFilter, validPrimariesFilter, groupFilter string, filterArgs ...interface{}) ([]RepositoryMetadata, error) {
	// The query below gets the status of every repository which has one or more assigned replicas that
	// are not able to serve requests at the moment. The status includes how many changes a replica is behind,
	// whether the replica is assigned host or not, whether the replica is healthy and whether the replica is
	// considered a valid primary candidate. It works as follows:
	//
	// 1. First we get all the storages which contain the repository from `storage_repositories`. We
	//    list every copy of the repository as the latest generation could exist on an unassigned
	//    storage.
	//
	// 2. We join `repository_assignments` table with fallback behavior in case the repository has no
	//    assignments. A storage is considered assigned if:
	//
	//    1. If the repository has no assignments, every configured storage is considered assigned.
	//    2. If the repository has assignments, the storage needs to be assigned explicitly.
	//    3. Assignments of unconfigured storages are treated as if they don't exist.
	//
	//    If none of the assigned storages are outdated, the repository is not considered outdated as
	//    the desired replication factor has been reached.
	//
	// 3. We join `repositories` table to filter out any repositories that have been deleted but still
	//    exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
	//    and there can't be any assignments for deleted repositories, this is still needed as long as the
	//    fallback behavior of no assignments is in place.
	//
	// 4. We join the `healthy_storages` view to return the storages current health.
	//
	// 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case
	//    of a failover.
	//
	// 6. Finally we aggregate each repository's information in to a single row with a JSON object containing
	//    the information. This allows us to group the output already in the query and makes scanning easier
	//    We filter out groups which do not have an assigned storage as the replication factor on those
	//    is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
	//    than the assigned ones.
	//

	var (
		virtualStorages []string
		storages        []string
	)

	for virtualStorage, configuredStorages := range rs.storages {
		for _, storage := range configuredStorages {
			virtualStorages = append(virtualStorages, virtualStorage)
			storages = append(storages, storage)
		}
	}

	args := append([]interface{}{virtualStorages, storages}, filterArgs...)

	rows, err := rs.db.QueryContext(ctx, fmt.Sprintf(`
WITH configured_storages AS (
	SELECT unnest($1::text[]) AS virtual_storage,
	       unnest($2::text[]) AS storage
),

repositories AS (
	SELECT *
	FROM repositories
	%s
),

storage_repositories AS (
	SELECT repository_id, storage, storage_repositories.generation, verified_at
	FROM repositories
	JOIN storage_repositories USING (repository_id)
),

valid_primaries AS (
	SELECT repository_id, storage
	FROM valid_primaries
	%s
)

SELECT
	json_build_object (
		'RepositoryID', repository_id,
		'VirtualStorage', virtual_storage,
		'RelativePath', relative_path,
		'ReplicaPath', replica_path,
		'Primary', "primary",
		'Generation', repositories.generation,
		'Replicas', json_agg(
			json_build_object(
				'Storage', storage,
				'Generation', COALESCE(replicas.generation, -1),
				'Assigned', assigned,
				'Healthy', healthy_storages.storage IS NOT NULL,
				'ValidPrimary', valid_primaries.storage IS NOT NULL,
				'VerifiedAt', verified_at
			)
		)
	)
FROM repositories
JOIN (
	SELECT
		repository_id,
		storage,
		generation,
		repository_assignments.storage IS NOT NULL AS assigned,
		verified_at
	FROM storage_repositories
	FULL JOIN (
		SELECT repository_id, storage
		FROM repositories
		JOIN configured_storages USING (virtual_storage)
		WHERE (
			SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
			FROM repository_assignments
			WHERE repository_id = repositories.repository_id
			AND   (virtual_storage, storage) IN (SELECT * FROM configured_storages)
		)
	) AS repository_assignments USING (repository_id, storage)
	ORDER BY repository_id, storage
) AS replicas USING (repository_id)
LEFT JOIN healthy_storages USING (virtual_storage, storage)
LEFT JOIN valid_primaries USING (repository_id, storage)
GROUP BY repository_id, virtual_storage, relative_path, replica_path, "primary", repositories.generation
%s
ORDER BY repository_id
	`, repositoriesFilter, validPrimariesFilter, groupFilter), args...)
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()

	var repos []RepositoryMetadata
	for rows.Next() {
		var repositoryJSON string
		if err := rows.Scan(&repositoryJSON); err != nil {
			return nil, fmt.Errorf("scan: %w", err)
		}

		var repo RepositoryMetadata
		if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil {
			return nil, fmt.Errorf("decode json: %w", err)
		}

		repos = append(repos, repo)
	}

	return repos, rows.Err()
}

// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
// exists with the given virtual storage and relative path combination, an error is returned.
func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
	var id int64
	if err := rs.db.QueryRowContext(ctx, `
SELECT nextval('repositories_repository_id_seq')
WHERE NOT EXISTS (
	SELECT FROM repositories
	WHERE virtual_storage = $1
	AND   relative_path   = $2
)
	`, virtualStorage, relativePath).Scan(&id); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, ErrRepositoryAlreadyExists
		}

		return 0, fmt.Errorf("scan: %w", err)
	}

	return id, nil
}

// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
// ErrRepositoryNotFound error if the repository doesn't exist.
func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
	var id int64
	if err := rs.db.QueryRowContext(ctx, `
SELECT repository_id
FROM repositories
WHERE virtual_storage = $1
AND   relative_path   = $2
	`, virtualStorage, relativePath).Scan(&id); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, ErrRepositoryNotFound
		}

		return 0, fmt.Errorf("scan: %w", err)
	}

	return id, nil
}

// GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record
// for the repository ID is not found.
func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) {
	var replicaPath string
	if err := rs.db.QueryRowContext(
		ctx, "SELECT replica_path FROM repositories WHERE repository_id = $1", repositoryID,
	).Scan(&replicaPath); err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return "", ErrRepositoryNotFound
		}

		return "", fmt.Errorf("scan: %w", err)
	}

	return replicaPath, nil
}