Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-02-01 17:57:19 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-02-02 20:11:04 +0300
commit9380d0ab0d5633feebbcba350d5126a63d6452ff (patch)
tree2016a6beeb1c077c629a97644999b0d9b5775139 /internal/praefect/coordinator_test.go
parentb10049671664e578b2d181c6e31be388398b69bd (diff)
handle repository creations separately in coordinator
This commit wires up repository creation routing to Praefect's coordinator. If the RPC creates a repository, the routing is handled different manner from other repository scoped mutators. This fixes repository creation when repository specific primaries are enabled. Prior to this, repository creation did not work as PerRepositoryRouter needs to know where to route the mutator request. As there were no record of the repository, the router didn't know where to route the request. The router now picks a random healthy node to act as the primary and sets the other nodes as replication targets. It then stores the primary in the database in the request finalizers.
Diffstat (limited to 'internal/praefect/coordinator_test.go')
-rw-r--r--internal/praefect/coordinator_test.go267
1 files changed, 170 insertions, 97 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 28cec30a1..43fec982e 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha1"
"errors"
+ "fmt"
"io/ioutil"
"sync"
"sync/atomic"
@@ -28,6 +29,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
@@ -593,124 +595,195 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
}
func TestStreamDirector_repo_creation(t *testing.T) {
- gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
- srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
- defer srv1.Stop()
- srv2, _ := testhelper.NewServerWithHealth(t, gitalySocket1)
- defer srv2.Stop()
-
- primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
- primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"}
- secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
- conf := config.Config{
- VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
- Name: "praefect",
- Nodes: []*config.Node{primaryNode, secondaryNode},
- },
+ for _, tc := range []struct {
+ electionStrategy config.ElectionStrategy
+ primaryStored bool
+ }{
+ {
+ electionStrategy: config.ElectionStrategySQL,
+ primaryStored: false,
},
- }
+ {
+ electionStrategy: config.ElectionStrategyPerRepository,
+ primaryStored: true,
+ },
+ } {
+ t.Run(string(tc.electionStrategy), func(t *testing.T) {
+ primaryNode := &config.Node{Storage: "praefect-internal-1"}
+ healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"}
+ unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"}
+ conf := config.Config{
+ Failover: config.Failover{ElectionStrategy: tc.electionStrategy},
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode},
+ },
+ },
+ }
- var replEventWait sync.WaitGroup
+ var replEventWait sync.WaitGroup
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ defer replEventWait.Done()
+ return queue.Enqueue(ctx, event)
+ })
+
+ rewrittenStorage := primaryNode.Storage
+ targetRepo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
- queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
- defer replEventWait.Done()
- return queue.Enqueue(ctx, event)
- })
+ var createRepositoryCalled int64
+ rs := datastore.MockRepositoryStore{
+ CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, storePrimary bool) error {
+ atomic.AddInt64(&createRepositoryCalled, 1)
+ assert.Equal(t, targetRepo.StorageName, virtualStorage)
+ assert.Equal(t, targetRepo.RelativePath, relativePath)
+ assert.Equal(t, rewrittenStorage, primary)
+ assert.Equal(t, tc.primaryStored, storePrimary)
+ return nil
+ },
+ }
- rewrittenStorage := primaryNode.Storage
- targetRepo := gitalypb.Repository{
- StorageName: "praefect",
- RelativePath: "/path/to/hashed/storage",
- }
+ var router Router
+ var primaryConnPointer string
+ var secondaryConnPointers []string
+ switch tc.electionStrategy {
+ case config.ElectionStrategySQL:
+ gitalySocket0 := testhelper.GetTemporaryGitalySocketFileName(t)
+ gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t)
+ gitalySocket3 := testhelper.GetTemporaryGitalySocketFileName(t)
+ srv1, _ := testhelper.NewServerWithHealth(t, gitalySocket0)
+ defer srv1.Stop()
+ srv2, _ := testhelper.NewServerWithHealth(t, gitalySocket1)
+ defer srv2.Stop()
+ srv3, healthSrv3 := testhelper.NewServerWithHealth(t, gitalySocket3)
+ healthSrv3.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ defer srv3.Stop()
+
+ primaryNode.Address = "unix://" + gitalySocket0
+ healthySecondaryNode.Address = "unix://" + gitalySocket1
+ unhealthySecondaryNode.Address = "unix://" + gitalySocket1
+
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ router = NewNodeManagerRouter(nodeMgr, rs)
+ for _, node := range nodeMgr.Nodes()["praefect"] {
+ if node.GetStorage() == primaryNode.Storage {
+ primaryConnPointer = fmt.Sprintf("%p", node.GetConnection())
+ continue
+ }
+ }
+ case config.ElectionStrategyPerRepository:
+ conns := Connections{
+ "praefect": {
+ primaryNode.Storage: &grpc.ClientConn{},
+ healthySecondaryNode.Storage: &grpc.ClientConn{},
+ unhealthySecondaryNode.Storage: &grpc.ClientConn{},
+ },
+ }
+ primaryConnPointer = fmt.Sprintf("%p", conns["praefect"][primaryNode.Storage])
+ router = NewPerRepositoryRouter(
+ conns,
+ nil,
+ StaticHealthChecker{"praefect": {primaryNode.Storage, healthySecondaryNode.Storage}},
+ randomFunc(func(n int) int {
+ require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes")
+ return 0
+ }),
+ nil,
+ nil,
+ )
+ default:
+ t.Fatalf("unexpected election strategy: %q", tc.electionStrategy)
+ }
- entry := testhelper.DiscardTestEntry(t)
+ txMgr := transactions.NewManager(conf)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ rs,
+ router,
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
- txMgr := transactions.NewManager(conf)
+ frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
+ Repository: &targetRepo,
+ })
+ require.NoError(t, err)
- var createRepositoryCalled int64
- rs := datastore.MockRepositoryStore{
- CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, storePrimary bool) error {
- atomic.AddInt64(&createRepositoryCalled, 1)
- assert.Equal(t, targetRepo.StorageName, virtualStorage)
- assert.Equal(t, targetRepo.RelativePath, relativePath)
- assert.Equal(t, rewrittenStorage, primary)
- assert.False(t, storePrimary)
- return nil
- },
- }
+ fullMethod := "/gitaly.RepositoryService/CreateRepository"
- coordinator := NewCoordinator(
- queueInterceptor,
- rs,
- NewNodeManagerRouter(nodeMgr, rs),
- txMgr,
- conf,
- protoregistry.GitalyProtoPreregistered,
- )
+ ctx, cancel := testhelper.Context()
+ defer cancel()
- frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
- Repository: &targetRepo,
- })
- require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+ require.Equal(t, primaryConnPointer, fmt.Sprintf("%p", streamParams.Primary().Conn))
- fullMethod := "/gitaly.RepositoryService/CreateRepository"
+ var secondaries []string
+ for _, dst := range streamParams.Secondaries() {
+ secondaries = append(secondaries, fmt.Sprintf("%p", dst.Conn))
+ }
+ require.Equal(t, secondaryConnPointers, secondaries, "secondary connections did not match expected")
- peeker := &mockPeeker{frame}
- streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
- require.NoError(t, err)
- require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
+ md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
+ require.True(t, ok)
+ require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
- md, ok := metadata.FromOutgoingContext(streamParams.Primary().Ctx)
- require.True(t, ok)
- require.Contains(t, md, praefect_metadata.PraefectMetadataKey)
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
- mi, err := coordinator.registry.LookupMethod(fullMethod)
- require.NoError(t, err)
+ m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
+ require.NoError(t, err)
- m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
- require.NoError(t, err)
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
- rewrittenTargetRepo, err := mi.TargetRepo(m)
- require.NoError(t, err)
- require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ replEventWait.Add(2) // expected only one event to be created
+ // this call creates new events in the queue and simulates usual flow of the update operation
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
- replEventWait.Add(1) // expected only one event to be created
- // this call creates new events in the queue and simulates usual flow of the update operation
- err = streamParams.RequestFinalizer()
- require.NoError(t, err)
+ replEventWait.Wait() // wait until event persisted (async operation)
- replEventWait.Wait() // wait until event persisted (async operation)
- events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
- require.NoError(t, err)
- require.Len(t, events, 1)
+ var expectedEvents, actualEvents []datastore.ReplicationEvent
+ for _, target := range []string{healthySecondaryNode.Storage, unhealthySecondaryNode.Storage} {
+ actual, err := queueInterceptor.Dequeue(ctx, "praefect", target, 10)
+ require.NoError(t, err)
+ require.Len(t, actual, 1)
+
+ actualEvents = append(actualEvents, actual[0])
+ expectedEvents = append(expectedEvents, datastore.ReplicationEvent{
+ ID: actual[0].ID,
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: fmt.Sprintf("praefect|%s|/path/to/hashed/storage", target),
+ CreatedAt: actual[0].CreatedAt,
+ UpdatedAt: actual[0].UpdatedAt,
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ VirtualStorage: conf.VirtualStorages[0].Name,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: target,
+ SourceNodeStorage: primaryNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ })
+ }
- expectedEvent := datastore.ReplicationEvent{
- ID: 1,
- State: datastore.JobStateInProgress,
- Attempt: 2,
- LockID: "praefect|praefect-internal-2|/path/to/hashed/storage",
- CreatedAt: events[0].CreatedAt,
- UpdatedAt: events[0].UpdatedAt,
- Job: datastore.ReplicationJob{
- Change: datastore.UpdateRepo,
- VirtualStorage: conf.VirtualStorages[0].Name,
- RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: secondaryNode.Storage,
- SourceNodeStorage: primaryNode.Storage,
- },
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ require.Equal(t, expectedEvents, actualEvents, "ensure replication job created by stream director is correct")
+ require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore")
+ })
}
- require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
- require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore")
}
func waitNodeToChangeHealthStatus(ctx context.Context, t *testing.T, node nodes.Node, health bool) {