package datastore import ( "context" "database/sql" "encoding/json" "errors" "fmt" "strings" "time" "" "" ) type storages map[string][]string // GenerationUnknown is used to indicate lack of generation number in // a replication job. Older instances can produce replication jobs // without a generation number. const GenerationUnknown = -1 var errWriteToOutdatedNodes = errors.New("write to outdated nodes") // DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository // that does not upgrade the target repository. type DowngradeAttemptedError struct { Storage string CurrentGeneration int AttemptedGeneration int } func (err DowngradeAttemptedError) Error() string { return fmt.Sprintf("attempted downgrading storage %q from generation %d to %d", err.Storage, err.CurrentGeneration, err.AttemptedGeneration, ) } var ( // ErrNoRowsAffected is returned when a query did not perform any changes. ErrNoRowsAffected = errors.New("no rows were affected by the query") // ErrRepositoryAlreadyExists is returned when trying to insert a repository into the datastore that already // exists. ErrRepositoryAlreadyExists = errors.New("repository already exists") // ErrRepositoryNotFound is returned when looking up a repository that does not exist in the datastore. ErrRepositoryNotFound = errors.New("repository not found") ) // RepositoryStore provides access to repository state. type RepositoryStore interface { // GetGeneration gets the repository's generation on a given storage. GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) // IncrementGeneration increments the generations of up to date nodes. IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error // SetGeneration sets the repository's generation on the given storage. If the generation is higher // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments. SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error // GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record // for the repository ID is not found. GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) // GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would // downgrade, a DowngradeAttemptedError is returned. GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) // CreateRepository creates a record for a repository in the specified virtual storage and relative path. // Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated // and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed // the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the store. // // storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as // the repository's primary. // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages // which are known to have a replica at the time of deletion. ErrRepositoryNotFound is returned when // the repository is not tracked by the Praefect datastore. DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) // DeleteAllRepositories deletes the database records associated with // repositories in the specified virtual storage. DeleteAllRepositories(ctx context.Context, virtualStorage string) error // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. DeleteReplica(ctx context.Context, repositoryID int64, storage string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) // GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which // are not able to serve requests at the moment. GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) // DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already // exists with the given virtual storage and relative path combination, an error is returned. ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) // GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a // ErrRepositoryNotFound error if the repository doesn't exist. GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) // GetRepositoryMetadata retrieves a repository's metadata. GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) // GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path. GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) // MarkUnverified marks replicas of the repository unverified. MarkUnverified(ctx context.Context, repositoryID int64) (int64, error) // MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified. MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. // Refer to the interface for method documentation. type PostgresRepositoryStore struct { db glsql.Querier storages } // NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore. func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore { return &PostgresRepositoryStore{db: db, storages: storages(configuredStorages)} } // MarkUnverified marks replicas of the repository unverified. func (rs *PostgresRepositoryStore) MarkUnverified(ctx context.Context, repositoryID int64) (int64, error) { result, err := rs.db.ExecContext(ctx, ` UPDATE storage_repositories SET verified_at = NULL WHERE repository_id = $1 AND verified_at IS NOT NULL `, repositoryID) if err != nil { return 0, fmt.Errorf("query: %w", err) } return result.RowsAffected() } // MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified. func (rs *PostgresRepositoryStore) MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) { result, err := rs.db.ExecContext(ctx, ` UPDATE storage_repositories SET verified_at = NULL FROM repositories WHERE repositories.virtual_storage = $1 AND repositories.repository_id = storage_repositories.repository_id AND verified_at IS NOT NULL `, virtualStorage) if err != nil { return 0, fmt.Errorf("query: %w", err) } return result.RowsAffected() } // MarkStorageUnverified marsk all replicas on the storage as unverified. func (rs *PostgresRepositoryStore) MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) { result, err := rs.db.ExecContext(ctx, ` UPDATE storage_repositories SET verified_at = NULL FROM repositories WHERE repositories.repository_id = storage_repositories.repository_id AND repositories.virtual_storage = $1 AND = $2 AND verified_at IS NOT NULL `, virtualStorage, storage) if err != nil { return 0, fmt.Errorf("query: %w", err) } return result.RowsAffected() } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) { const q = ` SELECT generation FROM storage_repositories WHERE repository_id = $1 AND storage = $2 ` var gen int if err := rs.db.QueryRowContext(ctx, q, repositoryID, storage).Scan(&gen); err != nil { if errors.Is(err, sql.ErrNoRows) { return GenerationUnknown, nil } return 0, err } return gen, nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error { const q = ` WITH updated_replicas AS ( UPDATE storage_repositories SET generation = generation + 1 FROM ( SELECT repository_id, storage FROM repositories JOIN storage_repositories USING (repository_id, generation) WHERE repository_id = $1 AND storage = ANY($2) FOR UPDATE ) AS to_update WHERE storage_repositories.repository_id = to_update.repository_id AND = RETURNING storage_repositories.repository_id ), updated_repository AS ( UPDATE repositories SET generation = generation + 1 FROM ( SELECT DISTINCT repository_id FROM updated_replicas ) AS updated_repositories WHERE repositories.repository_id = updated_repositories.repository_id ) SELECT EXISTS ( SELECT FROM repositories WHERE repository_id = $1 ) AS repository_exists, EXISTS ( SELECT FROM updated_replicas ) AS repository_updated ` var repositoryExists, repositoryUpdated bool if err := rs.db.QueryRowContext( ctx, q, repositoryID, append(secondaries, primary), ).Scan(&repositoryExists, &repositoryUpdated); err != nil { return fmt.Errorf("scan: %w", err) } if !repositoryExists { return ErrRepositoryNotFound } if !repositoryUpdated { return errWriteToOutdatedNodes } return nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error { const q = ` WITH repository AS ( UPDATE repositories SET generation = $3 WHERE repository_id = $1 AND COALESCE(repositories.generation, -1) < $3 ) INSERT INTO storage_repositories ( repository_id, virtual_storage, relative_path, storage, generation ) SELECT repository_id, virtual_storage, $4, $2, $3 FROM repositories WHERE repository_id = $1 ON CONFLICT (repository_id, storage) DO UPDATE SET relative_path = EXCLUDED.relative_path, generation = EXCLUDED.generation ` _, err := rs.db.ExecContext(ctx, q, repositoryID, storage, generation, relativePath) return err } // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storageName string) error { result, err := rs.db.ExecContext(ctx, ` WITH updated_repository AS ( UPDATE repositories SET generation = generation + 1 WHERE virtual_storage = $1 AND relative_path = $2 RETURNING repository_id, virtual_storage, relative_path, generation ) INSERT INTO storage_repositories (repository_id, virtual_storage, relative_path, storage, generation) SELECT repository_id, virtual_storage, relative_path, $3, generation FROM updated_repository ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET repository_id = EXCLUDED.repository_id, generation = EXCLUDED.generation `, virtualStorage, relativePath, storageName) if err != nil { return fmt.Errorf("exec: %w", err) } if rowsAffected, err := result.RowsAffected(); err != nil { return fmt.Errorf("rows affected: %w", err) } else if rowsAffected == 0 { return ErrRepositoryNotFound } return nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) { const q = ` SELECT storage, generation FROM storage_repositories WHERE repository_id = $1 AND storage = ANY($2) ` rows, err := rs.db.QueryContext(ctx, q, repositoryID, []string{source, target}) if err != nil { return 0, err } defer rows.Close() sourceGeneration := GenerationUnknown targetGeneration := GenerationUnknown for rows.Next() { var storage string var generation int if err := rows.Scan(&storage, &generation); err != nil { return 0, err } switch storage { case source: sourceGeneration = generation case target: targetGeneration = generation default: return 0, fmt.Errorf("unexpected storage: %s", storage) } } if err := rows.Err(); err != nil { return 0, err } if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration { return 0, DowngradeAttemptedError{ Storage: target, CurrentGeneration: targetGeneration, AttemptedGeneration: sourceGeneration, } } return sourceGeneration, nil } // CreateRepository creates a record for a repository in the specified virtual storage and relative path. // Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated // and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed // the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the // store. // // storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as // the repository's primary. // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { const q = ` WITH repo AS ( INSERT INTO repositories ( repository_id, virtual_storage, relative_path, replica_path, generation, "primary" ) VALUES ($8, $1, $2, $9, 0, CASE WHEN $4 THEN $3 END) ), assignments AS ( INSERT INTO repository_assignments ( repository_id, virtual_storage, relative_path, storage ) SELECT $8, $1, $2, storage FROM ( SELECT $3 AS storage UNION SELECT unnest($5::text[]) UNION SELECT unnest($6::text[]) ) AS storages WHERE $7 ) INSERT INTO storage_repositories ( repository_id, virtual_storage, relative_path, storage, generation ) SELECT $8, $1, $2, storage, 0 FROM ( SELECT $3 AS storage UNION SELECT unnest($5::text[]) ) AS updated_storages ` _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, primary, storePrimary, updatedSecondaries, outdatedSecondaries, storeAssignments, repositoryID, replicaPath, ) if err != nil { if glsql.IsUniqueViolation(err, "repositories_pkey") { return fmt.Errorf("repository id %d already in use", repositoryID) } if glsql.IsUniqueViolation(err, "storage_repositories_pkey") { return ErrRepositoryAlreadyExists } return err } return nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) { var ( replicaPath string storages glsql.StringArray ) if err := rs.db.QueryRowContext(ctx, ` WITH repository AS ( DELETE FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 RETURNING repository_id, replica_path ) SELECT replica_path, ARRAY_AGG( FROM repository LEFT JOIN storage_repositories USING (repository_id) GROUP BY replica_path `, virtualStorage, relativePath, ).Scan(&replicaPath, &storages); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", nil, ErrRepositoryNotFound } return "", nil, fmt.Errorf("scan: %w", err) } return replicaPath, storages.Slice(), nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error { _, err := rs.db.ExecContext(ctx, ` WITH delete_jobs AS ( DELETE FROM replication_queue WHERE job->>'virtual_storage' = $1 RETURNING id ), delete_job_locks AS ( DELETE FROM replication_queue_job_lock USING delete_jobs WHERE job_id = ), delete_locks AS ( DELETE FROM replication_queue_lock WHERE id LIKE $1 || '|%|%' ) DELETE FROM repositories WHERE virtual_storage = $1; `, virtualStorage) if err != nil { return err } return nil } // DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details. func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error { result, err := rs.db.ExecContext(ctx, ` DELETE FROM storage_repositories WHERE repository_id = $1 AND storage = $2 `, repositoryID, storage) if err != nil { return err } if n, err := result.RowsAffected(); err != nil { return err } else if n == 0 { return ErrNoRowsAffected } return nil } // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) { return rs.getConsistentStorages(ctx, ` SELECT replica_path, ARRAY_AGG(storage) FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repository_id = $1 GROUP BY replica_path `, repositoryID) } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { return rs.getConsistentStorages(ctx, ` SELECT replica_path, ARRAY_AGG(storage) FROM repositories JOIN storage_repositories USING (repository_id, relative_path, generation) WHERE repositories.virtual_storage = $1 AND repositories.relative_path = $2 GROUP BY replica_path `, virtualStorage, relativePath) } // getConsistentStorages is a helper for querying the consistent storages by different keys. func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, *datastructure.Set[string], error) { var replicaPath string var storages glsql.StringArray if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", nil, ErrRepositoryNotFound } return "", nil, fmt.Errorf("query: %w", err) } return replicaPath, datastructure.SetFromSlice(storages.Slice()), nil } //nolint:revive // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) { const q = ` SELECT true FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 ` var exists bool if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath).Scan(&exists); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, err } return exists, nil } // DeleteInvalidRepository deletes the given replica. If the replica was the only replica of the // repository, then the repository will be deleted, as well. func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error { _, err := rs.db.ExecContext(ctx, ` WITH repository AS ( SELECT repository_id FROM repositories WHERE repository_id = $1 FOR UPDATE ), invalid_repository AS ( DELETE FROM storage_repositories USING repository WHERE storage_repositories.repository_id = repository.repository_id AND storage = $2 ) DELETE FROM repositories USING repository WHERE repositories.repository_id = repository.repository_id AND NOT EXISTS ( SELECT 1 FROM storage_repositories WHERE repository_id = $1 AND storage != $2 ) `, repositoryID, storage) return err } // Replica represents a replica of a repository. type Replica struct { // Storage is the name of the replica's storage. Storage string // Generation is the replica's confirmed generation. If the replica does not yet exists, generation // is -1. Generation int64 // Assigned indicates whether the storage is an assigned host of the repository. Assigned bool // Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes. Healthy bool // ValidPrimary indicates whether the replica is ready to serve as the primary if necessary. ValidPrimary bool // VerifiedAt is the last successful verification time of the replica. VerifiedAt time.Time } // RepositoryMetadata contains the repository's metadata. type RepositoryMetadata struct { // RepositoryID is the internal id of the repository. RepositoryID int64 // VirtualStorage is the virtual storage where the repository is. VirtualStorage string // RelativePath is the relative path of the repository. RelativePath string // ReplicaPath is the actual disk location where the replicas are stored in the storages. ReplicaPath string // Primary is the current primary of this repository. Primary string // Generation is the current generation of the repository. Generation int64 // Replicas contains information of the repository on each storage that contains the repository // or does not contain the repository but is assigned to host it. Replicas []Replica } // GetRepositoryMetadata retrieves a repository's metadata. func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) { metadata, err := rs.getRepositoryMetadata( ctx, "WHERE repository_id = $3", "WHERE repository_id = $3", "", repositoryID, ) if err != nil { return RepositoryMetadata{}, err } if len(metadata) == 0 { return RepositoryMetadata{}, ErrRepositoryNotFound } return metadata[0], nil } // GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path. func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) { metadata, err := rs.getRepositoryMetadata( ctx, "WHERE virtual_storage = $3 AND relative_path = $4", "WHERE repository_id = (SELECT repository_id FROM repositories)", "", virtualStorage, relativePath, ) if err != nil { return RepositoryMetadata{}, err } if len(metadata) == 0 { return RepositoryMetadata{}, ErrRepositoryNotFound } return metadata[0], nil } // GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which // are not able to serve requests at the moment. func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) { _, ok := rs.storages[virtualStorage] if !ok { return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage) } return rs.getRepositoryMetadata(ctx, "WHERE virtual_storage = $3", "WHERE virtual_storage = $3", "HAVING bool_or(NOT IS NOT NULL) FILTER(WHERE assigned)", virtualStorage, ) } // GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which // are not able to serve requests at the moment. func (rs *PostgresRepositoryStore) getRepositoryMetadata(ctx context.Context, repositoriesFilter, validPrimariesFilter, groupFilter string, filterArgs ...interface{}) ([]RepositoryMetadata, error) { // The query below gets the status of every repository which has one or more assigned replicas that // are not able to serve requests at the moment. The status includes how many changes a replica is behind, // whether the replica is assigned host or not, whether the replica is healthy and whether the replica is // considered a valid primary candidate. It works as follows: // // 1. First we get all the storages which contain the repository from `storage_repositories`. We // list every copy of the repository as the latest generation could exist on an unassigned // storage. // // 2. We join `repository_assignments` table with fallback behavior in case the repository has no // assignments. A storage is considered assigned if: // // 1. If the repository has no assignments, every configured storage is considered assigned. // 2. If the repository has assignments, the storage needs to be assigned explicitly. // 3. Assignments of unconfigured storages are treated as if they don't exist. // // If none of the assigned storages are outdated, the repository is not considered outdated as // the desired replication factor has been reached. // // 3. We join `repositories` table to filter out any repositories that have been deleted but still // exist on some storages. While the `repository_assignments` has a foreign key on `repositories` // and there can't be any assignments for deleted repositories, this is still needed as long as the // fallback behavior of no assignments is in place. // // 4. We join the `healthy_storages` view to return the storages current health. // // 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case // of a failover. // // 6. Finally we aggregate each repository's information in to a single row with a JSON object containing // the information. This allows us to group the output already in the query and makes scanning easier // We filter out groups which do not have an assigned storage as the replication factor on those // is reached. Status of unassigned storages does not matter as long as they don't contain a later generation // than the assigned ones. // var ( virtualStorages []string storages []string ) for virtualStorage, configuredStorages := range rs.storages { for _, storage := range configuredStorages { virtualStorages = append(virtualStorages, virtualStorage) storages = append(storages, storage) } } args := append([]interface{}{virtualStorages, storages}, filterArgs...) rows, err := rs.db.QueryContext(ctx, fmt.Sprintf(` WITH configured_storages AS ( SELECT unnest($1::text[]) AS virtual_storage, unnest($2::text[]) AS storage ), repositories AS ( SELECT * FROM repositories %s ), storage_repositories AS ( SELECT repository_id, storage, storage_repositories.generation, verified_at FROM repositories JOIN storage_repositories USING (repository_id) ), valid_primaries AS ( SELECT repository_id, storage FROM valid_primaries %s ) SELECT json_build_object ( 'RepositoryID', repository_id, 'VirtualStorage', virtual_storage, 'RelativePath', relative_path, 'ReplicaPath', replica_path, 'Primary', "primary", 'Generation', repositories.generation, 'Replicas', json_agg( json_build_object( 'Storage', storage, 'Generation', COALESCE(replicas.generation, -1), 'Assigned', assigned, 'Healthy', IS NOT NULL, 'ValidPrimary', IS NOT NULL, 'VerifiedAt', verified_at ) ) ) FROM repositories JOIN ( SELECT repository_id, storage, generation, IS NOT NULL AS assigned, verified_at FROM storage_repositories FULL JOIN ( SELECT repository_id, storage FROM repositories JOIN configured_storages USING (virtual_storage) WHERE ( SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = = 1 FROM repository_assignments WHERE repository_id = repositories.repository_id AND (virtual_storage, storage) IN (SELECT * FROM configured_storages) ) ) AS repository_assignments USING (repository_id, storage) ORDER BY repository_id, storage ) AS replicas USING (repository_id) LEFT JOIN healthy_storages USING (virtual_storage, storage) LEFT JOIN valid_primaries USING (repository_id, storage) GROUP BY repository_id, virtual_storage, relative_path, replica_path, "primary", repositories.generation %s ORDER BY repository_id `, repositoriesFilter, validPrimariesFilter, groupFilter), args...) if err != nil { return nil, fmt.Errorf("query: %w", err) } defer rows.Close() var repos []RepositoryMetadata for rows.Next() { var repositoryJSON string if err := rows.Scan(&repositoryJSON); err != nil { return nil, fmt.Errorf("scan: %w", err) } var repo RepositoryMetadata if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil { return nil, fmt.Errorf("decode json: %w", err) } repos = append(repos, repo) } return repos, rows.Err() } // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already // exists with the given virtual storage and relative path combination, an error is returned. func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { var id int64 if err := rs.db.QueryRowContext(ctx, ` SELECT nextval('repositories_repository_id_seq') WHERE NOT EXISTS ( SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 ) `, virtualStorage, relativePath).Scan(&id); err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, ErrRepositoryAlreadyExists } return 0, fmt.Errorf("scan: %w", err) } return id, nil } // GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a // ErrRepositoryNotFound error if the repository doesn't exist. func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { var id int64 if err := rs.db.QueryRowContext(ctx, ` SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 `, virtualStorage, relativePath).Scan(&id); err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, ErrRepositoryNotFound } return 0, fmt.Errorf("scan: %w", err) } return id, nil } // GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record // for the repository ID is not found. func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) { var replicaPath string if err := rs.db.QueryRowContext( ctx, "SELECT replica_path FROM repositories WHERE repository_id = $1", repositoryID, ).Scan(&replicaPath); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", ErrRepositoryNotFound } return "", fmt.Errorf("scan: %w", err) } return replicaPath, nil }