Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-10-11 14:35:20 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-10-14 11:14:32 +0300
commit311fbb9789c5310314c8a2c6916bfb331aa4661b (patch)
treed09cf1dae2d774767da16e4b0a7cd405340da682 /internal
parentb47cb9f7818c124b88b0230ec0c7dc5183ca25df (diff)
Get host assignment by repository ID
Praefect's routing should use repository ID consistently in all of the queries in order to avoid races where the external key of the repository changes between the various queries. This commit updates GetHostAssignments to get assignments by the repository's ID rather than its virtual storage and relative path.
Diffstat (limited to 'internal')
-rw-r--r--internal/praefect/assignment.go8
-rw-r--r--internal/praefect/datastore/assignment.go52
-rw-r--r--internal/praefect/datastore/assignment_test.go67
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/info_service_test.go9
-rw-r--r--internal/praefect/router_per_repository.go4
-rw-r--r--internal/praefect/service/info/repositories.go2
-rw-r--r--internal/praefect/service/info/server.go2
8 files changed, 77 insertions, 70 deletions
diff --git a/internal/praefect/assignment.go b/internal/praefect/assignment.go
index f0f38e5af..69a689507 100644
--- a/internal/praefect/assignment.go
+++ b/internal/praefect/assignment.go
@@ -13,18 +13,18 @@ type AssignmentStore interface {
SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
}
-type disabledAssignments map[string][]string
+type disabledAssignments []string
// NewDisabledAssignmentStore returns an assignments store that can be used if no
// database is configured. It returns every configured storage as assigned and
// errors when trying to set assignments.
-func NewDisabledAssignmentStore(storages map[string][]string) AssignmentStore {
+func NewDisabledAssignmentStore(storages []string) AssignmentStore {
return disabledAssignments(storages)
}
// GetHostAssigments simply returns all of the storages configured for the virtual storage.
-func (storages disabledAssignments) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
- return storages[virtualStorage], nil
+func (storages disabledAssignments) GetHostAssignments(ctx context.Context, repositoryID int64) ([]string, error) {
+ return storages, nil
}
// SetReplicationFactor errors when attempting to set assignments.
diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go
index 7fc48eedc..29d3e0c87 100644
--- a/internal/praefect/datastore/assignment.go
+++ b/internal/praefect/datastore/assignment.go
@@ -2,9 +2,12 @@ package datastore
import (
"context"
+ "database/sql"
+ "errors"
"fmt"
"github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
)
@@ -38,36 +41,37 @@ func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string
return AssignmentStore{db: db, configuredStorages: configuredStorages}
}
-func (s AssignmentStore) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
- configuredStorages, ok := s.configuredStorages[virtualStorage]
- if !ok {
- return nil, newVirtualStorageNotFoundError(virtualStorage)
- }
+func (s AssignmentStore) GetHostAssignments(ctx context.Context, repositoryID int64) ([]string, error) {
+ var (
+ virtualStorage string
+ storages pq.StringArray
+ )
+ if err := s.db.QueryRowContext(ctx, `
+SELECT repositories.virtual_storage, ARRAY_AGG(storage) FILTER (WHERE storage IS NOT NULL)
+FROM repositories
+LEFT JOIN repository_assignments USING (repository_id)
+WHERE repository_id = $1
+GROUP BY repositories.virtual_storage
+ `, repositoryID).Scan(&virtualStorage, &storages); err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, commonerr.ErrRepositoryNotFound
+ }
- rows, err := s.db.QueryContext(ctx, `
-SELECT storage
-FROM repository_assignments
-WHERE virtual_storage = $1
-AND relative_path = $2
-AND storage = ANY($3)
-`, virtualStorage, relativePath, pq.StringArray(configuredStorages))
- if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
- defer rows.Close()
- var assignedStorages []string
- for rows.Next() {
- var storage string
- if err := rows.Scan(&storage); err != nil {
- return nil, fmt.Errorf("scan: %w", err)
- }
-
- assignedStorages = append(assignedStorages, storage)
+ configuredStorages, ok := s.configuredStorages[virtualStorage]
+ if !ok {
+ return nil, newVirtualStorageNotFoundError(virtualStorage)
}
- if err := rows.Err(); err != nil {
- return nil, fmt.Errorf("iterating rows: %w", err)
+ assignedStorages := make([]string, 0, len(storages))
+ for _, assignedStorage := range storages {
+ for _, configuredStorage := range configuredStorages {
+ if assignedStorage == configuredStorage {
+ assignedStorages = append(assignedStorages, configuredStorage)
+ }
+ }
}
if len(assignedStorages) == 0 {
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
index 67f83e7fc..fa8cd7bfa 100644
--- a/internal/praefect/datastore/assignment_test.go
+++ b/internal/praefect/datastore/assignment_test.go
@@ -1,7 +1,7 @@
package datastore
import (
- "errors"
+ "sort"
"testing"
"github.com/lib/pq"
@@ -22,56 +22,56 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
db := glsql.NewDB(t)
configuredStorages := []string{"storage-1", "storage-2", "storage-3"}
+ configuredVirtualStorages := map[string][]string{"virtual-storage": configuredStorages}
for _, tc := range []struct {
- desc string
- virtualStorage string
- existingAssignments []assignment
- expectedAssignments []string
- error error
+ desc string
+ configuredVirtualStorages map[string][]string
+ existingAssignments []assignment
+ expectedAssignments []string
+ error error
}{
{
- desc: "virtual storage not found",
- virtualStorage: "invalid-virtual-storage",
- error: newVirtualStorageNotFoundError("invalid-virtual-storage"),
+ desc: "virtual storage not found",
+ error: newVirtualStorageNotFoundError("virtual-storage"),
},
{
- desc: "configured storages fallback when no records",
- virtualStorage: "virtual-storage",
- expectedAssignments: configuredStorages,
+ desc: "configured storages fallback when no records",
+ configuredVirtualStorages: configuredVirtualStorages,
+ expectedAssignments: configuredStorages,
},
{
- desc: "configured storages fallback when a repo exists in different virtual storage",
- virtualStorage: "virtual-storage",
+ desc: "configured storages fallback when a repo exists in different virtual storage",
existingAssignments: []assignment{
{virtualStorage: "other-virtual-storage", relativePath: "relative-path", storage: "storage-1"},
},
- expectedAssignments: configuredStorages,
+ configuredVirtualStorages: configuredVirtualStorages,
+ expectedAssignments: configuredStorages,
},
{
- desc: "configured storages fallback when a different repo exists in the virtual storage ",
- virtualStorage: "virtual-storage",
+ desc: "configured storages fallback when a different repo exists in the virtual storage ",
existingAssignments: []assignment{
{virtualStorage: "virtual-storage", relativePath: "other-relative-path", storage: "storage-1"},
},
- expectedAssignments: configuredStorages,
+ configuredVirtualStorages: configuredVirtualStorages,
+ expectedAssignments: configuredStorages,
},
{
- desc: "unconfigured storages are ignored",
- virtualStorage: "virtual-storage",
+ desc: "unconfigured storages are ignored",
existingAssignments: []assignment{
{virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "unconfigured-storage"},
},
- expectedAssignments: configuredStorages,
+ configuredVirtualStorages: configuredVirtualStorages,
+ expectedAssignments: configuredStorages,
},
{
- desc: "assignments found",
- virtualStorage: "virtual-storage",
+ desc: "assignments found",
existingAssignments: []assignment{
{virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-1"},
{virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-2"},
{virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "unconfigured"},
},
- expectedAssignments: []string{"storage-1", "storage-2"},
+ configuredVirtualStorages: configuredVirtualStorages,
+ expectedAssignments: []string{"storage-1", "storage-2"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -81,13 +81,14 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
db.TruncateAll(t)
rs := NewPostgresRepositoryStore(db, nil)
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage", "other-relative-path", "primary", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 3, "other-virtual-storage", "relative-path", "primary", nil, nil, false, false))
+
for _, assignment := range tc.existingAssignments {
repositoryID, err := rs.GetRepositoryID(ctx, assignment.virtualStorage, assignment.relativePath)
- if errors.Is(err, commonerr.NewRepositoryNotFoundError(assignment.virtualStorage, assignment.relativePath)) {
- repositoryID, err = rs.ReserveRepositoryID(ctx, assignment.virtualStorage, assignment.relativePath)
- require.NoError(t, err)
-
- require.NoError(t, rs.CreateRepository(ctx, repositoryID, assignment.virtualStorage, assignment.relativePath, assignment.storage, nil, nil, false, false))
+ if err != nil {
+ require.Equal(t, commonerr.NewRepositoryNotFoundError(assignment.virtualStorage, assignment.relativePath), err)
}
_, err = db.ExecContext(ctx, `
@@ -99,8 +100,8 @@ func TestAssignmentStore_GetHostAssignments(t *testing.T) {
actualAssignments, err := NewAssignmentStore(
db,
- map[string][]string{"virtual-storage": configuredStorages},
- ).GetHostAssignments(ctx, tc.virtualStorage, "relative-path")
+ tc.configuredVirtualStorages,
+ ).GetHostAssignments(ctx, 1)
require.Equal(t, tc.error, err)
require.ElementsMatch(t, tc.expectedAssignments, actualAssignments)
})
@@ -233,8 +234,10 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) {
tc.requireStorages(t, setStorages)
- assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", "relative-path")
+ assignedStorages, err := store.GetHostAssignments(ctx, 1)
require.NoError(t, err)
+
+ sort.Strings(assignedStorages)
tc.requireStorages(t, assignedStorages)
var storagesWithIncorrectRepositoryID pq.StringArray
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 5d5b82c24..11d92dc52 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -162,9 +162,6 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op
if opt.withNodeMgr == nil {
opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withRepoStore)
}
- if opt.withAssignmentStore == nil {
- opt.withAssignmentStore = NewDisabledAssignmentStore(conf.StorageNames())
- }
if opt.withRouter == nil {
opt.withRouter = NewNodeManagerRouter(opt.withNodeMgr, opt.withRepoStore)
}
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index b026359fa..63fab99d8 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -82,16 +82,19 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
rs.CreateRepository(ctx, 1, virtualStorage, testRepo.GetRelativePath(), "g-1", []string{"g-2", "g-3"}, nil, true, false),
)
+ assignmentStore := datastore.NewAssignmentStore(tx, conf.StorageNames())
+
cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withConnections: conns,
- withRepoStore: rs,
+ withConnections: conns,
+ withRepoStore: rs,
+ withAssignmentStore: assignmentStore,
withRouter: NewPerRepositoryRouter(
conns,
elector,
StaticHealthChecker{virtualStorage: storages},
NewLockedRandom(rand.New(rand.NewSource(0))),
rs,
- datastore.NewAssignmentStore(tx, conf.StorageNames()),
+ assignmentStore,
rs,
conf.DefaultReplicationFactors(),
),
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 4cb4da498..2317029d1 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -25,7 +25,7 @@ var errPrimaryUnassigned = errors.New("primary node is not assigned")
type AssignmentGetter interface {
// GetHostAssignments returns the names of the storages assigned to host the repository.
// The primary node must always be assigned.
- GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)
+ GetHostAssignments(ctx context.Context, repositoryID int64) ([]string, error)
}
// ErrNoSuitableNode is returned when there is not suitable node to serve a request.
@@ -207,7 +207,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
}
- assignedStorages, err := r.ag.GetHostAssignments(ctx, virtualStorage, relativePath)
+ assignedStorages, err := r.ag.GetHostAssignments(ctx, repositoryID)
if err != nil {
return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err)
}
diff --git a/internal/praefect/service/info/repositories.go b/internal/praefect/service/info/repositories.go
index e701329d7..f80a5715e 100644
--- a/internal/praefect/service/info/repositories.go
+++ b/internal/praefect/service/info/repositories.go
@@ -25,7 +25,7 @@ func (s *Server) RepositoryReplicas(ctx context.Context, in *gitalypb.Repository
return nil, fmt.Errorf("get primary: %w", err)
}
- assignments, err := s.assignmentStore.GetHostAssignments(ctx, virtualStorage, relativePath)
+ assignments, err := s.assignmentStore.GetHostAssignments(ctx, repositoryID)
if err != nil {
return nil, fmt.Errorf("get host assignments: %w", err)
}
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 26703a411..d80e2cd7a 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -19,7 +19,7 @@ import (
type AssignmentStore interface {
// GetHostAssignments returns the names of the storages assigned to host the repository.
// The primary node must always be assigned.
- GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)
+ GetHostAssignments(ctx context.Context, repositoryID int64) ([]string, error)
// SetReplicationFactor sets a repository's replication factor and returns the current assignments.
SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
}