Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-02-01 17:57:19 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-02 20:11:04 +0300
commit9380d0ab0d5633feebbcba350d5126a63d6452ff (patch)
tree2016a6beeb1c077c629a97644999b0d9b5775139
parentb10049671664e578b2d181c6e31be388398b69bd (diff)
handle repository creations separately in coordinator
This commit wires up repository creation routing to Praefect's coordinator. If the RPC creates a repository, the routing is handled different manner from other repository scoped mutators. This fixes repository creation when repository specific primaries are enabled. Prior to this, repository creation did not work as PerRepositoryRouter needs to know where to route the mutator request. As there were no record of the repository, the router didn't know where to route the request. The router now picks a random healthy node to act as the primary and sets the other nodes as replication targets. It then stores the primary in the database in the request finalizers.
-rw-r--r--internal/praefect/coordinator.go31
-rw-r--r--internal/praefect/coordinator_test.go267
2 files changed, 191 insertions, 107 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 5bd99ada7..1bc5cfe7f 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -355,13 +355,27 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
targetRepo := call.targetRepo
virtualStorage := call.targetRepo.StorageName
- route, err := c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath)
+ change, params, err := getReplicationDetails(call.fullMethodName, call.msg)
if err != nil {
- if errors.Is(err, ErrRepositoryReadOnly) {
- return nil, err
+ return nil, fmt.Errorf("mutator call: replication details: %w", err)
+ }
+
+ var route RepositoryMutatorRoute
+ switch change {
+ case datastore.CreateRepo:
+ route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage)
+ if err != nil {
+ return nil, fmt.Errorf("route repository creation: %w", err)
}
+ default:
+ route, err = c.router.RouteRepositoryMutator(ctx, virtualStorage, targetRepo.RelativePath)
+ if err != nil {
+ if errors.Is(err, ErrRepositoryReadOnly) {
+ return nil, err
+ }
- return nil, fmt.Errorf("mutator call: route repository mutator: %w", err)
+ return nil, fmt.Errorf("mutator call: route repository mutator: %w", err)
+ }
}
primaryMessage, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, route.Primary.Storage)
@@ -369,11 +383,6 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
return nil, fmt.Errorf("mutator call: rewrite storage: %w", err)
}
- change, params, err := getReplicationDetails(call.fullMethodName, call.msg)
- if err != nil {
- return nil, fmt.Errorf("mutator call: replication details: %w", err)
- }
-
var finalizers []func() error
primaryDest := proxy.Destination{
@@ -754,7 +763,9 @@ func (c *Coordinator) newRequestFinalizer(
ctxlogrus.Extract(ctx).WithError(err).Info("deleted repository does not have a store entry")
}
case datastore.CreateRepo:
- if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary, false); err != nil {
+ if err := c.rs.CreateRepository(ctx, virtualStorage, targetRepo.GetRelativePath(), primary,
+ c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository,
+ ); err != nil {
if !errors.Is(err, datastore.RepositoryExistsError{}) {
return fmt.Errorf("create repository: %w", err)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 28cec30a1..43fec982e 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha1"
"errors"
+ "fmt"
"io/ioutil"
"sync"
"sync/atomic"
@@ -28,6 +29,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
@@ -593,124 +595,195 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
}
func TestStreamDirector_repo_creation(t *testing.T) {
- gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
- srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
- defer srv1.Stop()
- srv2, _ := testhelper.NewServerWithHealth(t, gitalySocket1)
- defer srv2.Stop()
-
- primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
- primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"}
- secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
- conf := config.Config{
- VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
- Name: "praefect",
- Nodes: []*config.Node{primaryNode, secondaryNode},
- },
+ for _, tc := range []struct {
+ electionStrategy config.ElectionStrategy
+ primaryStored bool
+ }{
+ {
+ electionStrategy: config.ElectionStrategySQL,
+ primaryStored: false,
},
- }
+ {
+ electionStrategy: config.ElectionStrategyPerRepository,
+ primaryStored: true,
+ },
+ } {
+ t.Run(string(tc.electionStrategy), func(t *testing.T) {
+ primaryNode := &config.Node{Storage: "praefect-internal-1"}
+ healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"}
+ unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"}
+ conf := config.Config{
+ Failover: config.Failover{ElectionStrategy: tc.electionStrategy},
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode},
+ },
+ },
+ }
- var replEventWait sync.WaitGroup
+ var replEventWait sync.WaitGroup
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ defer replEventWait.Done()
+ return queue.Enqueue(ctx, event)
+ })
+
+ rewrittenStorage := primaryNode.Storage
+ targetRepo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
- queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- defer replEventWait.Done()
- return queue.Enqueue(ctx, event)
- })
+ var createRepositoryCalled int64
+ rs := datastore.MockRepositoryStore{
+ CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, storePrimary bool) error {
+ atomic.AddInt64(&createRepositoryCalled, 1)
+ assert.Equal(t, targetRepo.StorageName, virtualStorage)
+ assert.Equal(t, targetRepo.RelativePath, relativePath)
+ assert.Equal(t, rewrittenStorage, primary)
+ assert.Equal(t, tc.primaryStored, storePrimary)
+ return nil
+ },
+ }
- rewrittenStorage := primaryNode.Storage
- targetRepo := gitalypb.Repository{
- StorageName: "praefect",
- RelativePath: "/path/to/hashed/storage",
- }
+ var router Router
+ var primaryConnPointer string
+ var secondaryConnPointers []string
+ switch tc.electionStrategy {
+ case config.ElectionStrategySQL:
+ gitalySocket0 := testhelper.GetTemporaryGitalySocketFileName(t)
+ gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t)
+ gitalySocket3 := testhelper.GetTemporaryGitalySocketFileName(t)
+ srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ defer srv1.Stop()
+ srv2, _ := testhelper.NewServerWithHealth(t, gitalySocket1)
+ defer srv2.Stop()
+ srv3, healthSrv3 := testhelper.NewServerWithHealth(t, gitalySocket3)
+ healthSrv3.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ defer srv3.Stop()
+
+ primaryNode.Address = "unix://" + gitalySocket0
+ healthySecondaryNode.Address = "unix://" + gitalySocket1
+ unhealthySecondaryNode.Address = "unix://" + gitalySocket1
+
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ router = NewNodeManagerRouter(nodeMgr, rs)
+ for _, node := range nodeMgr.Nodes()["praefect"] {
+ if node.GetStorage() == primaryNode.Storage {
+ primaryConnPointer = fmt.Sprintf("%p", node.GetConnection())
+ continue
+ }
+ }
+ case config.ElectionStrategyPerRepository:
+ conns := Connections{
+ "praefect": {
+ primaryNode.Storage: &grpc.ClientConn{},
+ healthySecondaryNode.Storage: &grpc.ClientConn{},
+ unhealthySecondaryNode.Storage: &grpc.ClientConn{},
+ },
+ }
+ primaryConnPointer = fmt.Sprintf("%p", conns["praefect"][primaryNode.Storage])
+ router = NewPerRepositoryRouter(
+ conns,
+ nil,
+ StaticHealthChecker{"praefect": {primaryNode.Storage, healthySecondaryNode.Storage}},
+ randomFunc(func(n int) int {
+ require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes")
+ return 0
+ }),
+ nil,
+ nil,
+ )
+ default:
+ t.Fatalf("unexpected election strategy: %q", tc.electionStrategy)
+ }
- entry := testhelper.DiscardTestEntry(t)
+ txMgr := transactions.NewManager(conf)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ rs,
+ router,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
- txMgr := transactions.NewManager(conf)
+ frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
+ Repository: &targetRepo,
+ })
+ require.NoError(t, err)
- var createRepositoryCalled int64
- rs := datastore.MockRepositoryStore{
- CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, storePrimary bool) error {
- atomic.AddInt64(&createRepositoryCalled, 1)
- assert.Equal(t, targetRepo.StorageName, virtualStorage)
- assert.Equal(t, targetRepo.RelativePath, relativePath)
- assert.Equal(t, rewrittenStorage, primary)
- assert.False(t, storePrimary)
- return nil
- },
- }
+ fullMethod := "/gitaly.RepositoryService/CreateRepository"
- coordinator := NewCoordinator(
- queueInterceptor,
- rs,
- NewNodeManagerRouter(nodeMgr, rs),
- txMgr,
- conf,
- protoregistry.GitalyProtoPreregistered,
- )
+ ctx, cancel := testhelper.Context()
+ defer cancel()
- frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
- Repository: &targetRepo,
- })
- require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ require.Equal(t, primaryConnPointer, fmt.Sprintf("%p", streamParams.Primary().Conn))
- fullMethod := "/gitaly.RepositoryService/CreateRepository"
+ var secondaries []string
+ for _, dst := range streamParams.Secondaries() {
+ secondaries = append(secondaries, fmt.Sprintf("%p", dst.Conn))
+ }
+ require.Equal(t, secondaryConnPointers, secondaries, "secondary connections did not match expected")
- peeker := &mockPeeker{frame}
- streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
- require.NoError(t, err)
- require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
+ md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
+ require.True(t, ok)
+ require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
- md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
- require.True(t, ok)
- require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
- mi, err := coordinator.registry.LookupMethod(fullMethod)
- require.NoError(t, err)
+ m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
+ require.NoError(t, err)
- m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
- require.NoError(t, err)
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
- rewrittenTargetRepo, err := mi.TargetRepo(m)
- require.NoError(t, err)
- require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ replEventWait.Add(2) // expected only one event to be created
+ // this call creates new events in the queue and simulates usual flow of the update operation
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
- replEventWait.Add(1) // expected only one event to be created
- // this call creates new events in the queue and simulates usual flow of the update operation
- err = streamParams.RequestFinalizer()
- require.NoError(t, err)
+ replEventWait.Wait() // wait until event persisted (async operation)
- replEventWait.Wait() // wait until event persisted (async operation)
- events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
- require.NoError(t, err)
- require.Len(t, events, 1)
+ var expectedEvents, actualEvents []datastore.ReplicationEvent
+ for _, target := range []string{healthySecondaryNode.Storage, unhealthySecondaryNode.Storage} {
+ actual, err := queueInterceptor.Dequeue(ctx, "praefect", target, 10)
+ require.NoError(t, err)
+ require.Len(t, actual, 1)
+
+ actualEvents = append(actualEvents, actual[0])
+ expectedEvents = append(expectedEvents, datastore.ReplicationEvent{
+ ID: actual[0].ID,
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: fmt.Sprintf("praefect|%s|/path/to/hashed/storage", target),
+ CreatedAt: actual[0].CreatedAt,
+ UpdatedAt: actual[0].UpdatedAt,
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: conf.VirtualStorages[0].Name,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: target,
+ SourceNodeStorage: primaryNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ })
+ }
- expectedEvent := datastore.ReplicationEvent{
- ID: 1,
- State: datastore.JobStateInProgress,
- Attempt: 2,
- LockID: "praefect|praefect-internal-2|/path/to/hashed/storage",
- CreatedAt: events[0].CreatedAt,
- UpdatedAt: events[0].UpdatedAt,
- Job: datastore.ReplicationJob{
- Change: datastore.UpdateRepo,
- VirtualStorage: conf.VirtualStorages[0].Name,
- RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: secondaryNode.Storage,
- SourceNodeStorage: primaryNode.Storage,
- },
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ require.Equal(t, expectedEvents, actualEvents, "ensure replication job created by stream director is correct")
+ require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore")
+ })
}
- require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
- require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore")
}
func waitNodeToChangeHealthStatus(ctx context.Context, t *testing.T, node nodes.Node, health bool) {