diff options
Diffstat (limited to 'internal/praefect/datastore/repository_store.go')
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 31 |
1 files changed, 25 insertions, 6 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index d3cf1e514..7efe03708 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -99,7 +99,7 @@ type RepositoryStore interface { GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) // CreateRepository creates the repository for the virtual storage and the storage. Returns // RepositoryExistsError when trying to create a repository which already has a matching record. - CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error // DeleteRepository deletes the repository from the virtual storage and the storage. Returns // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage // or the storage. @@ -297,14 +297,33 @@ AND storage = ANY($3) //nolint:stylecheck //nolint:golint -func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string) error { +func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, storage string, healthyVirtualStorages, healthyPhysicalStorages []string) error { const q = ` -WITH repo AS ( +WITH healthy_storages AS ( + SELECT unnest($4::text[]) AS virtual_storage, unnest($5::text[]) AS storage +), primaries AS ( + SELECT storage + FROM healthy_storages + LEFT JOIN storage_repositories USING (virtual_storage, storage) + WHERE virtual_storage = $1 + AND storage_repositories.relative_path = $2 + AND ( + -- If assignments exist for the repository, only the assigned storages elected as primary. + -- If no assignments exist, any healthy node can be elected as the primary + SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = storage_repositories.storage) = 1 + FROM repository_assignments + WHERE repository_assignments.virtual_storage = storage_repositories.virtual_storage + AND repository_assignments.relative_path = storage_repositories.relative_path + ) + ORDER BY generation DESC NULLS LAST, random() + LIMIT 1 +), repo AS ( INSERT INTO repositories ( virtual_storage, relative_path, - generation - ) VALUES ($1, $2, 0) + generation, + "primary" + ) VALUES ($1, $2, 0, (SELECT storage FROM primaries)) ) INSERT INTO storage_repositories ( virtual_storage, @@ -315,7 +334,7 @@ INSERT INTO storage_repositories ( VALUES ($1, $2, $3, 0) ` - _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage) + _, err := rs.db.ExecContext(ctx, q, virtualStorage, relativePath, storage, pq.StringArray(healthyVirtualStorages), pq.StringArray(healthyPhysicalStorages)) var pqerr *pq.Error if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" { |