diff options
author | James Fargher <proglottis@gmail.com> | 2021-09-15 02:04:33 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2021-09-15 02:04:33 +0300 |
commit | 3903ef7556e5705d0b62bbcbdd24649e4c4835a3 (patch) | |
tree | 7b8c2dfdcbfad4971a776375f7d6862dfb12a5e6 | |
parent | 6ee4f0a09c4fc4a5df334a5033f19986272373a1 (diff) | |
parent | b04aa750d115162ca8f468244aeb2810b6be8c10 (diff) |
Merge branch 'smh-not-found-mutator' into 'master'
Return NotFound code for mutators targeting non-existent repositories
See merge request gitlab-org/gitaly!3869
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 153 |
2 files changed, 101 insertions, 60 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 66a828b59..b2819380e 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -357,6 +357,10 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr } if err != nil { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return nil, helper.ErrNotFound(err) + } + return nil, err } @@ -390,10 +394,6 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal ctx, virtualStorage, repoPath, shouldRouteRepositoryAccessorToPrimary(ctx, call), ) if err != nil { - if errors.As(err, new(commonerr.RepositoryNotFoundError)) { - return nil, helper.ErrNotFound(err) - } - return nil, fmt.Errorf("accessor call: route repository accessor: %w", err) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 91fd9f42d..13fd4263d 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "math/rand" "strings" "sync" "sync/atomic" @@ -170,75 +171,115 @@ func TestStreamDirectorMutator(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - entry := testhelper.DiscardTestEntry(t) + txMgr := transactions.NewManager(conf) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) + nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil) require.NoError(t, err) - nodeMgr.Start(0, time.Hour) + defer nodeSet.Close() - txMgr := transactions.NewManager(conf) - rs := datastore.MockRepositoryStore{} + for _, tc := range []struct { + desc string + repositoryExists bool + error error + }{ + { + desc: "succcessful", + repositoryExists: true, + }, + { + desc: "repository not found", + error: helper.ErrNotFound(fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get primary: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + tx := glsql.NewDB(t).Begin(t) + defer tx.Rollback(t) - coordinator := NewCoordinator( - queueInterceptor, - rs, - NewNodeManagerRouter(nodeMgr, rs), - txMgr, - conf, - protoregistry.GitalyProtoPreregistered, - ) + rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) - frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ - Origin: &targetRepo, - ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo}, - }) - require.NoError(t, err) + if tc.repositoryExists { + require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) + } - require.NoError(t, err) + testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool" + coordinator := NewCoordinator( + queueInterceptor, + rs, + NewPerRepositoryRouter( + nodeSet.Connections(), + nodes.NewPerRepositoryElector(tx), + StaticHealthChecker(conf.StorageNames()), + NewLockedRandom(rand.New(rand.NewSource(0))), + rs, + datastore.NewAssignmentStore(tx, conf.StorageNames()), + rs, + nil, + ), + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) - peeker := &mockPeeker{frame} - streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) - require.NoError(t, err) - require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) + frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ + Origin: &targetRepo, + ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo}, + }) + require.NoError(t, err) - mi, err := coordinator.registry.LookupMethod(fullMethod) - require.NoError(t, err) + require.NoError(t, err) - m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) - require.NoError(t, err) + fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool" - rewrittenTargetRepo, err := mi.TargetRepo(m) - require.NoError(t, err) - require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") + peeker := &mockPeeker{frame} + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + if tc.error != nil { + require.Equal(t, tc.error, err) + return + } - replEventWait.Add(1) // expected only one event to be created - // this call creates new events in the queue and simulates usual flow of the update operation - require.NoError(t, streamParams.RequestFinalizer()) + require.NoError(t, err) + require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) - replEventWait.Wait() // wait until event persisted (async operation) - events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) - require.NoError(t, err) - require.Len(t, events, 1) - - expectedEvent := datastore.ReplicationEvent{ - ID: 1, - State: datastore.JobStateInProgress, - Attempt: 2, - LockID: "praefect|praefect-internal-2|/path/to/hashed/storage", - CreatedAt: events[0].CreatedAt, - UpdatedAt: events[0].UpdatedAt, - Job: datastore.ReplicationJob{ - Change: datastore.UpdateRepo, - VirtualStorage: conf.VirtualStorages[0].Name, - RelativePath: targetRepo.RelativePath, - TargetNodeStorage: secondaryNode.Storage, - SourceNodeStorage: primaryNode.Storage, - }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + mi, err := coordinator.registry.LookupMethod(fullMethod) + require.NoError(t, err) + + m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) + require.NoError(t, err) + + rewrittenTargetRepo, err := mi.TargetRepo(m) + require.NoError(t, err) + require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") + + replEventWait.Add(1) // expected only one event to be created + // this call creates new events in the queue and simulates usual flow of the update operation + require.NoError(t, streamParams.RequestFinalizer()) + + replEventWait.Wait() // wait until event persisted (async operation) + events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) + require.NoError(t, err) + require.Len(t, events, 1) + + expectedEvent := datastore.ReplicationEvent{ + ID: 1, + State: datastore.JobStateInProgress, + Attempt: 2, + LockID: "praefect|praefect-internal-2|/path/to/hashed/storage", + CreatedAt: events[0].CreatedAt, + UpdatedAt: events[0].UpdatedAt, + Job: datastore.ReplicationJob{ + RepositoryID: 1, + Change: datastore.UpdateRepo, + VirtualStorage: conf.VirtualStorages[0].Name, + RelativePath: targetRepo.RelativePath, + TargetNodeStorage: secondaryNode.Storage, + SourceNodeStorage: primaryNode.Storage, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + } + require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") + }) } - require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") } func TestStreamDirectorMutator_StopTransaction(t *testing.T) { @@ -416,7 +457,7 @@ func TestStreamDirectorAccessor(t *testing.T) { return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) }, }, - error: helper.ErrNotFound(commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)), + error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))), }, } { t.Run(tc.desc, func(t *testing.T) { |