Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-07-27 17:25:10 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-08-31 15:57:48 +0300
commit1836583ff226a634932a9e2f424acc3e8882d9d9 (patch)
treef31db9b1b4b695def91d5bdf9720db95a3f1f0f2
parentec619962966cc59d889c7792b1613b3ac4c0968f (diff)
Reserve repository ids when routing repository creations
When Praefect is routing a repository creation, it needs to reserve a repository ID in order to generate later a relative path for the repository. This commit changes the PerRepositoryRouter to reserve a repository ID and return in the route. The ID reservation also acts as a fail fast check to see whether there already exists a repository with the given virtual storage and relative path.
-rw-r--r--cmd/praefect/main.go1
-rw-r--r--internal/praefect/coordinator.go5
-rw-r--r--internal/praefect/coordinator_test.go1
-rw-r--r--internal/praefect/router.go2
-rw-r--r--internal/praefect/router_per_repository.go23
-rw-r--r--internal/praefect/router_per_repository_test.go45
6 files changed, 68 insertions, 9 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index e67c17d4c..ed908551c 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -312,6 +312,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))),
csg,
assignmentStore,
+ rs,
conf.DefaultReplicationFactors(),
)
} else {
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b4c007e3a..45346e488 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -627,6 +627,11 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string,
if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
return nil, helper.ErrInvalidArgument(err)
}
+
+ if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) {
+ return nil, helper.ErrAlreadyExists(err)
+ }
+
return nil, err
}
return sp, nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 0c24863b2..d2225f1a3 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -873,6 +873,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
},
nil,
nil,
+ rs,
conf.DefaultReplicationFactors(),
)
default:
diff --git a/internal/praefect/router.go b/internal/praefect/router.go
index bc708ffd9..fc99b4769 100644
--- a/internal/praefect/router.go
+++ b/internal/praefect/router.go
@@ -25,6 +25,8 @@ type StorageMutatorRoute struct {
// RepositoryMutatorRoute describes how to route a repository scoped mutator call.
type RepositoryMutatorRoute struct {
+ // RepositoryID is the repository's ID as Praefect identifies it.
+ RepositoryID int64
// Primary is the primary node of the transaction.
Primary RouterNode
// Secondaries are the secondary participating in a transaction.
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index 3a1a98f4e..326d375cb 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -63,11 +63,20 @@ type PerRepositoryRouter struct {
rand Random
hc HealthChecker
csg datastore.ConsistentStoragesGetter
+ rs datastore.RepositoryStore
defaultReplicationFactors map[string]int
}
// NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration.
-func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, csg datastore.ConsistentStoragesGetter, ag AssignmentGetter, defaultReplicationFactors map[string]int) *PerRepositoryRouter {
+func NewPerRepositoryRouter(
+ conns Connections,
+ pg PrimaryGetter,
+ hc HealthChecker,
+ rand Random,
+ csg datastore.ConsistentStoragesGetter,
+ ag AssignmentGetter,
+ rs datastore.RepositoryStore,
+ defaultReplicationFactors map[string]int) *PerRepositoryRouter {
return &PerRepositoryRouter{
conns: conns,
pg: pg,
@@ -75,6 +84,7 @@ func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecke
hc: hc,
csg: csg,
ag: ag,
+ rs: rs,
defaultReplicationFactors: defaultReplicationFactors,
}
}
@@ -248,9 +258,17 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
return RepositoryMutatorRoute{}, err
}
+ id, err := r.rs.ReserveRepositoryID(ctx, virtualStorage, relativePath)
+ if err != nil {
+ return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err)
+ }
+
replicationFactor := r.defaultReplicationFactors[virtualStorage]
if replicationFactor == 1 {
- return RepositoryMutatorRoute{Primary: primary}, nil
+ return RepositoryMutatorRoute{
+ RepositoryID: id,
+ Primary: primary,
+ }, nil
}
var secondaryNodes []RouterNode
@@ -296,6 +314,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu
}
return RepositoryMutatorRoute{
+ RepositoryID: id,
Primary: primary,
Secondaries: secondaries,
ReplicationTargets: replicationTargets,
diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go
index 4bb27d0bf..a964b41b1 100644
--- a/internal/praefect/router_per_repository_test.go
+++ b/internal/praefect/router_per_repository_test.go
@@ -2,10 +2,12 @@ package praefect
import (
"context"
+ "fmt"
"sort"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
@@ -95,6 +97,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) {
},
nil,
nil,
+ datastore.MockRepositoryStore{},
nil,
)
@@ -226,6 +229,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
},
},
nil,
+ datastore.MockRepositoryStore{},
nil,
)
@@ -367,6 +371,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
},
},
tc.assignedNodes,
+ datastore.MockRepositoryStore{},
nil,
)
@@ -423,6 +428,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
primaryCandidates int
primaryPick int
secondaryCandidates int
+ repositoryExists bool
matchRoute matcher
error error
}{
@@ -445,6 +451,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
primaryPick: 0,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
+ RepositoryID: 1,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
ReplicationTargets: []string{"secondary-1", "secondary-2"},
},
@@ -458,7 +465,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
primaryPick: 0,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ RepositoryID: 1,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
{Storage: "secondary-2", Connection: secondary2Conn},
@@ -474,7 +482,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
primaryPick: 0,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ RepositoryID: 1,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{
{Storage: "secondary-1", Connection: secondary1Conn},
},
@@ -491,7 +500,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
primaryPick: 0,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ RepositoryID: 1,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
},
),
},
@@ -505,12 +515,14 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
secondaryCandidates: 2,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
- Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
+ RepositoryID: 1,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
},
RepositoryMutatorRoute{
- Primary: RouterNode{Storage: "primary", Connection: primaryConn},
- Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}},
+ RepositoryID: 1,
+ Primary: RouterNode{Storage: "primary", Connection: primaryConn},
+ Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}},
},
),
},
@@ -524,12 +536,22 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
secondaryCandidates: 2,
matchRoute: requireOneOf(
RepositoryMutatorRoute{
+ RepositoryID: 1,
Primary: RouterNode{Storage: "primary", Connection: primaryConn},
Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
ReplicationTargets: []string{"secondary-2"},
},
),
},
+ {
+ desc: "repository already exists",
+ virtualStorage: "virtual-storage-1",
+ healthyNodes: StaticHealthChecker(configuredNodes),
+ primaryCandidates: 3,
+ primaryPick: 0,
+ repositoryExists: true,
+ error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists),
+ },
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := testhelper.Context()
@@ -556,6 +578,15 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
},
nil,
nil,
+ datastore.MockRepositoryStore{
+ ReserveRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
+ if tc.repositoryExists {
+ return 0, commonerr.ErrRepositoryAlreadyExists
+ }
+
+ return 1, nil
+ },
+ },
map[string]int{"virtual-storage-1": tc.replicationFactor},
).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path")
if tc.error != nil {