Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-11-11 21:57:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-11-19 18:39:49 +0300
commit720fe6dd7dd7429f5ee91f4eef37fffc872a3018 (patch)
tree8518601a7fa6937d512fa696867689e7ccadef5e
parent381df30b1b49b9fcfbc1e4107a106a70f1403c7d (diff)
fetch host node assignments from the database
In preparation for variable replication factor, Praefect gained support for assigning host nodes for repositories in 243dc385. The PerRepositoryRouter uses host assignments to determine which nodes should participate in transactions or get replicated to a given write. This commit adds a Postgres implementation of AssignmentGetter interface. This allows the PerRepositoryRouter to get host node assignments from the database to only include assigned nodes in transactions and replications.
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--internal/praefect/datastore/assignment.go69
-rw-r--r--internal/praefect/datastore/assignment_test.go135
3 files changed, 205 insertions, 1 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 78f597443..df4505bb9 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -300,7 +300,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
hm,
praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
rs,
- praefect.StaticStorageAssignments(conf.StorageNames()),
+ datastore.NewAssignmentStore(db, conf.StorageNames()),
)
} else {
healthChecker = praefect.HealthChecker(nodeManager)
diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go
new file mode 100644
index 000000000..012fc8838
--- /dev/null
+++ b/internal/praefect/datastore/assignment.go
@@ -0,0 +1,69 @@
+package datastore
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
+)
+
+func newVirtualStorageNotFoundError(virtualStorage string) error {
+ return fmt.Errorf("virtual storage %q not found", virtualStorage)
+}
+
+func newAssignmentsNotFoundError(virtualStorage, relativePath string) error {
+ return fmt.Errorf("host assignments for repository %q/%q not found", virtualStorage, relativePath)
+}
+
+// AssignmentStore manages host assignments in Postgres.
+type AssignmentStore struct {
+ db glsql.Querier
+ configuredStorages map[string][]string
+}
+
+// NewAssignmentsStore returns a new AssignmentStore using the passed in database.
+func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore {
+ return AssignmentStore{db: db, configuredStorages: configuredStorages}
+}
+
+func (s AssignmentStore) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
+ configuredStorages, ok := s.configuredStorages[virtualStorage]
+ if !ok {
+ return nil, newVirtualStorageNotFoundError(virtualStorage)
+ }
+
+ rows, err := s.db.QueryContext(ctx, `
+SELECT storage
+FROM repositories
+JOIN storage_repositories USING (virtual_storage, relative_path)
+WHERE virtual_storage = $1
+AND relative_path = $2
+AND assigned
+AND storage = ANY($3::text[])
+ `, virtualStorage, relativePath, pq.StringArray(configuredStorages))
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var assignedStorages []string
+ for rows.Next() {
+ var storage string
+ if err := rows.Scan(&storage); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ assignedStorages = append(assignedStorages, storage)
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("iterating rows: %w", err)
+ }
+
+ if len(assignedStorages) == 0 {
+ return nil, newAssignmentsNotFoundError(virtualStorage, relativePath)
+ }
+
+ return assignedStorages, nil
+}
diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go
new file mode 100644
index 000000000..8ac8592c7
--- /dev/null
+++ b/internal/praefect/datastore/assignment_test.go
@@ -0,0 +1,135 @@
+// +build postgres
+
+package datastore
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestAssignmentStore_GetHostAssignments(t *testing.T) {
+ type repository struct {
+ virtualStorage string
+ relativePath string
+ }
+
+ type storage struct {
+ virtualStorage string
+ relativePath string
+ storage string
+ assigned bool
+ }
+
+ for _, tc := range []struct {
+ desc string
+ virtualStorage string
+ repositories []repository
+ storages []storage
+ assignments []string
+ error error
+ }{
+ {
+ desc: "virtual storage not found",
+ virtualStorage: "invalid-virtual-storage",
+ error: newVirtualStorageNotFoundError("invalid-virtual-storage"),
+ },
+ {
+ desc: "not found when no records",
+ virtualStorage: "virtual-storage",
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "not found when only repository record",
+ virtualStorage: "virtual-storage",
+ repositories: []repository{
+ {virtualStorage: "virtual-storage", relativePath: "relative-path"},
+ },
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "not found with incorrect virtual storage",
+ virtualStorage: "virtual-storage",
+ repositories: []repository{
+ {virtualStorage: "other-virtual-storage", relativePath: "relative-path"},
+ },
+ storages: []storage{
+ {virtualStorage: "other-virtual-storage", relativePath: "relative-path", storage: "storage-1", assigned: true},
+ },
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "not found with incorrect relative path",
+ virtualStorage: "virtual-storage",
+ repositories: []repository{
+ {virtualStorage: "virtual-storage", relativePath: "other-relative-path"},
+ },
+ storages: []storage{
+ {virtualStorage: "virtual-storage", relativePath: "other-relative-path", storage: "storage-1", assigned: true},
+ },
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+
+ {
+ desc: "not found when only storage record",
+ virtualStorage: "virtual-storage",
+ storages: []storage{
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-1", assigned: true},
+ },
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "unconfigured storages are ignored",
+ virtualStorage: "virtual-storage",
+ storages: []storage{
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "unconfigured-storage", assigned: true},
+ },
+ error: newAssignmentsNotFoundError("virtual-storage", "relative-path"),
+ },
+ {
+ desc: "assignments found",
+ virtualStorage: "virtual-storage",
+ repositories: []repository{
+ {virtualStorage: "virtual-storage", relativePath: "relative-path"},
+ },
+ storages: []storage{
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-1", assigned: true},
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-2", assigned: true},
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "storage-3", assigned: false},
+ {virtualStorage: "virtual-storage", relativePath: "relative-path", storage: "unconfigured", assigned: true},
+ },
+ assignments: []string{"storage-1", "storage-2"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ db := getDB(t)
+
+ for _, repository := range tc.repositories {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO repositories (virtual_storage, relative_path)
+ VALUES ($1, $2)
+ `, repository.virtualStorage, repository.relativePath)
+ require.NoError(t, err)
+ }
+
+ for _, storage := range tc.storages {
+ _, err := db.ExecContext(ctx, `
+ INSERT INTO storage_repositories (virtual_storage, relative_path, storage, assigned, generation)
+ VALUES ($1, $2, $3, $4, 0)
+ `, storage.virtualStorage, storage.relativePath, storage.storage, storage.assigned)
+ require.NoError(t, err)
+ }
+
+ assignments, err := NewAssignmentStore(
+ db,
+ map[string][]string{"virtual-storage": {"storage-1", "storage-2", "storage-3"}},
+ ).GetHostAssignments(ctx, tc.virtualStorage, "relative-path")
+ require.Equal(t, tc.error, err)
+ require.ElementsMatch(t, tc.assignments, assignments)
+ })
+ }
+}