diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-07-27 17:25:10 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-08-31 15:57:48 +0300 |
commit | 1836583ff226a634932a9e2f424acc3e8882d9d9 (patch) | |
tree | f31db9b1b4b695def91d5bdf9720db95a3f1f0f2 | |
parent | ec619962966cc59d889c7792b1613b3ac4c0968f (diff) |
Reserve repository ids when routing repository creations
When Praefect is routing a repository creation, it needs to reserve
a repository ID in order to generate later a relative path for the
repository. This commit changes the PerRepositoryRouter to reserve a
repository ID and return in the route. The ID reservation also acts
as a fail fast check to see whether there already exists a repository
with the given virtual storage and relative path.
-rw-r--r-- | cmd/praefect/main.go | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/router.go | 2 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 23 | ||||
-rw-r--r-- | internal/praefect/router_per_repository_test.go | 45 |
6 files changed, 68 insertions, 9 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e67c17d4c..ed908551c 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -312,6 +312,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), csg, assignmentStore, + rs, conf.DefaultReplicationFactors(), ) } else { diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b4c007e3a..45346e488 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -627,6 +627,11 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, if errors.Is(err, nodes.ErrVirtualStorageNotExist) { return nil, helper.ErrInvalidArgument(err) } + + if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return nil, helper.ErrAlreadyExists(err) + } + return nil, err } return sp, nil diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 0c24863b2..d2225f1a3 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -873,6 +873,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, nil, nil, + rs, conf.DefaultReplicationFactors(), ) default: diff --git a/internal/praefect/router.go b/internal/praefect/router.go index bc708ffd9..fc99b4769 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -25,6 +25,8 @@ type StorageMutatorRoute struct { // RepositoryMutatorRoute describes how to route a repository scoped mutator call. type RepositoryMutatorRoute struct { + // RepositoryID is the repository's ID as Praefect identifies it. + RepositoryID int64 // Primary is the primary node of the transaction. Primary RouterNode // Secondaries are the secondary participating in a transaction. diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 3a1a98f4e..326d375cb 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -63,11 +63,20 @@ type PerRepositoryRouter struct { rand Random hc HealthChecker csg datastore.ConsistentStoragesGetter + rs datastore.RepositoryStore defaultReplicationFactors map[string]int } // NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration. -func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, csg datastore.ConsistentStoragesGetter, ag AssignmentGetter, defaultReplicationFactors map[string]int) *PerRepositoryRouter { +func NewPerRepositoryRouter( + conns Connections, + pg PrimaryGetter, + hc HealthChecker, + rand Random, + csg datastore.ConsistentStoragesGetter, + ag AssignmentGetter, + rs datastore.RepositoryStore, + defaultReplicationFactors map[string]int) *PerRepositoryRouter { return &PerRepositoryRouter{ conns: conns, pg: pg, @@ -75,6 +84,7 @@ func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecke hc: hc, csg: csg, ag: ag, + rs: rs, defaultReplicationFactors: defaultReplicationFactors, } } @@ -248,9 +258,17 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, err } + id, err := r.rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) + } + replicationFactor := r.defaultReplicationFactors[virtualStorage] if replicationFactor == 1 { - return RepositoryMutatorRoute{Primary: primary}, nil + return RepositoryMutatorRoute{ + RepositoryID: id, + Primary: primary, + }, nil } var secondaryNodes []RouterNode @@ -296,6 +314,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu } return RepositoryMutatorRoute{ + RepositoryID: id, Primary: primary, Secondaries: secondaries, ReplicationTargets: replicationTargets, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 4bb27d0bf..a964b41b1 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -2,10 +2,12 @@ package praefect import ( "context" + "fmt" "sort" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -95,6 +97,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -226,6 +229,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }, }, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -367,6 +371,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { }, }, tc.assignedNodes, + datastore.MockRepositoryStore{}, nil, ) @@ -423,6 +428,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryCandidates int primaryPick int secondaryCandidates int + repositoryExists bool matchRoute matcher error error }{ @@ -445,6 +451,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, }, @@ -458,7 +465,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, {Storage: "secondary-2", Connection: secondary2Conn}, @@ -474,7 +482,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, }, @@ -491,7 +500,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), }, @@ -505,12 +515,14 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, ), }, @@ -524,12 +536,22 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, }, ), }, + { + desc: "repository already exists", + virtualStorage: "virtual-storage-1", + healthyNodes: StaticHealthChecker(configuredNodes), + primaryCandidates: 3, + primaryPick: 0, + repositoryExists: true, + error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists), + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -556,6 +578,15 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{ + ReserveRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if tc.repositoryExists { + return 0, commonerr.ErrRepositoryAlreadyExists + } + + return 1, nil + }, + }, map[string]int{"virtual-storage-1": tc.replicationFactor}, ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path") if tc.error != nil { |