diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-05-02 18:33:48 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-05-02 18:33:48 +0300 |
commit | 1ff9e1dabd34e2bea91f1f6cf1253f5952b9c90a (patch) | |
tree | 720a0f105e52005950b0789f2bc4504d720a76ac | |
parent | 118c4a042dfee4310733f71368da6e948a22c92e (diff) | |
parent | 7c8bd8efa52b22ed411db0542f239dedcda7bf6d (diff) |
Merge branch 'pks-coordinator-fix-resolving-additional-repository-via-wrong-storage' into 'master'
coordinator: Fix rewriting additional repos to the wrong storage node
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5688
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/praefect/coordinator.go | 35 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 338 |
2 files changed, 294 insertions, 79 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 10773f5e0..7214718ed 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -382,6 +382,21 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } else if err != nil { return nil, structerr.NewInvalidArgument("%w", err) } else { + // We do not support resolving multiple different repositories that reside on + // different virtual storages. This kind of makes sense from a technical point of + // view as Praefect cannot guarantee to resolve both virtual storages. So for the + // time being we accept this restriction and handle it explicitly. + // + // Note that this is the same condition as in `rewrittenRepositoryMessage()`. This + // is done so that we detect such erroneous requests before we try to connect to the + // target node, which allows us to return a proper error to the user that indicates + // the underlying issue instead of an unrelated symptom. + // + // This limitation may be lifted in the future. + if virtualStorage != additionalRepo.GetStorageName() { + return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported") + } + additionalRepoRelativePath = additionalRepo.GetRelativePath() } @@ -796,20 +811,32 @@ func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, st return nil, structerr.NewInvalidArgument("%w", err) } - // rewrite the target repository - targetRepo.StorageName = storage - targetRepo.RelativePath = relativePath - if additionalRepo, err := mi.AdditionalRepo(m); errors.Is(err, protoregistry.ErrTargetRepoMissing) { // Nothing to rewrite in case the additional repository either doesn't exist in the // message or wasn't set by the caller. } else if err != nil { return nil, structerr.NewInvalidArgument("%w", err) } else { + // We do not support resolving multiple different repositories that reside on + // different virtual storages. This kind of makes sense from a technical point of + // view as Praefect cannot guarantee to resolve both virtual storages. So for the + // time being we accept this restriction and handle it explicitly. + // + // This limitation may be lifted in the future. + if targetRepo.GetStorageName() != additionalRepo.GetStorageName() { + return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported") + } + additionalRepo.StorageName = storage additionalRepo.RelativePath = additionalRelativePath } + // Rewrite the target repository. Note that we only do this after having written the + // additional repository so that we can check whether the original storage name of both + // repositories match. + targetRepo.StorageName = storage + targetRepo.RelativePath = relativePath + return proxy.NewCodec().Marshal(m) } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 82f698f51..71a96ba53 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -141,47 +142,232 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { func TestStreamDirectorMutator(t *testing.T) { t.Parallel() - gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) - testhelper.NewServerWithHealth(t, gitalySocket0) - testhelper.NewServerWithHealth(t, gitalySocket1) - primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1 - primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"} - secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"} + ctx := testhelper.Context(t) + ctx = correlation.ContextWithCorrelation(ctx, "my-correlation-id") + + primarySocket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, primarySocket) + primaryNode := &config.Node{Address: "unix://" + primarySocket, Storage: "praefect-internal-1"} + + secondarySocket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, secondarySocket) + secondaryNode := &config.Node{Address: "unix://" + secondarySocket, Storage: "praefect-internal-2"} + + unrelatedSocket := testhelper.GetTemporaryGitalySocketFileName(t) + testhelper.NewServerWithHealth(t, unrelatedSocket) + unrelatedNode := &config.Node{Address: "unix://" + unrelatedSocket, Storage: "praefect-unrelated"} + conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { Name: "praefect", Nodes: []*config.Node{primaryNode, secondaryNode}, }, + { + Name: "unrelated", + Nodes: []*config.Node{unrelatedNode}, + }, }, } - db := testdb.New(t) - - targetRepo := gitalypb.Repository{ - StorageName: "praefect", - RelativePath: "/path/to/hashed/storage", - } - ctx := testhelper.Context(t) + db := testdb.New(t) txMgr := transactions.NewManager(conf) nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) defer nodeSet.Close() + type setupData struct { + method string + request proto.Message + expectedErr error + expectedRewrittenRequest proto.Message + expectedEvent datastore.ReplicationEvent + } + + var currentRepoID int64 + createRepo := func(t *testing.T, rs datastore.RepositoryStore, storage, relativePath, rewrittenPath string) int64 { + t.Helper() + repoID := atomic.AddInt64(¤tRepoID, 1) + require.NoError(t, rs.CreateRepository(ctx, repoID, storage, relativePath, rewrittenPath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) + return repoID + } + for _, tc := range []struct { - desc string - repositoryExists bool - error error + desc string + setup func(t *testing.T, rs datastore.RepositoryStore) setupData }{ { - desc: "succcessful", - repositoryExists: true, + desc: "target repository", + setup: func(t *testing.T, rs datastore.RepositoryStore) setupData { + relativePath := gittest.NewRepositoryName(t) + targetRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: relativePath, + } + + repoID := createRepo(t, rs, "praefect", relativePath, relativePath) + + return setupData{ + method: "/gitaly.OperationService/UserCreateTag", + request: &gitalypb.UserCreateTagRequest{ + Repository: targetRepo, + }, + expectedRewrittenRequest: &gitalypb.UserCreateTagRequest{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: relativePath, + }, + }, + expectedEvent: datastore.ReplicationEvent{ + State: datastore.JobStateInProgress, + Attempt: 2, + LockID: fmt.Sprintf("praefect|praefect-internal-2|%s", relativePath), + Job: datastore.ReplicationJob{ + RepositoryID: repoID, + Change: datastore.UpdateRepo, + VirtualStorage: conf.VirtualStorages[0].Name, + RelativePath: targetRepo.RelativePath, + TargetNodeStorage: secondaryNode.Storage, + SourceNodeStorage: primaryNode.Storage, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + }, + } + }, }, { - desc: "repository not found", - error: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get repository id: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))), + desc: "target and additional repository", + setup: func(t *testing.T, rs datastore.RepositoryStore) setupData { + targetRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + additionalRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + + targetRepoID := createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target") + createRepo(t, rs, "praefect", additionalRepo.RelativePath, "rewritten-additional") + + return setupData{ + method: "/gitaly.ObjectPoolService/FetchIntoObjectPool", + request: &gitalypb.FetchIntoObjectPoolRequest{ + Origin: additionalRepo, + ObjectPool: &gitalypb.ObjectPool{ + Repository: targetRepo, + }, + }, + expectedRewrittenRequest: &gitalypb.FetchIntoObjectPoolRequest{ + Origin: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: "rewritten-additional", + }, + ObjectPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-1", + RelativePath: "rewritten-target", + }, + }, + }, + expectedEvent: datastore.ReplicationEvent{ + State: datastore.JobStateInProgress, + Attempt: 2, + LockID: fmt.Sprintf("praefect|praefect-internal-2|%s", targetRepo.RelativePath), + Job: datastore.ReplicationJob{ + RepositoryID: targetRepoID, + Change: datastore.UpdateRepo, + VirtualStorage: conf.VirtualStorages[0].Name, + RelativePath: targetRepo.RelativePath, + TargetNodeStorage: secondaryNode.Storage, + SourceNodeStorage: primaryNode.Storage, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + }, + } + }, + }, + { + desc: "target repository not found", + setup: func(t *testing.T, rs datastore.RepositoryStore) setupData { + // We do not create the repository, so we expect this request to + // fail. + targetRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + + return setupData{ + method: "/gitaly.OperationService/UserCreateTag", + request: &gitalypb.UserCreateTagRequest{ + Repository: targetRepo, + }, + expectedErr: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get repository id: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))), + } + }, + }, + { + desc: "additional repository not found", + setup: func(t *testing.T, rs datastore.RepositoryStore) setupData { + targetRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + additionalRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + + // We create the target repository, but don't create the additional + // one. + createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target") + + return setupData{ + method: "/gitaly.ObjectPoolService/FetchIntoObjectPool", + request: &gitalypb.FetchIntoObjectPoolRequest{ + Origin: additionalRepo, + ObjectPool: &gitalypb.ObjectPool{ + Repository: targetRepo, + }, + }, + expectedErr: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w", + fmt.Errorf("resolve additional replica path: %w", + fmt.Errorf("get additional repository id: %w", + commonerr.NewRepositoryNotFoundError(additionalRepo.StorageName, additionalRepo.RelativePath), + ), + ), + )), + } + }, + }, + { + desc: "target and additional repository on different storages", + setup: func(t *testing.T, rs datastore.RepositoryStore) setupData { + targetRepo := &gitalypb.Repository{ + StorageName: "praefect", + RelativePath: gittest.NewRepositoryName(t), + } + additionalRepo := &gitalypb.Repository{ + StorageName: "unrelated", + RelativePath: gittest.NewRepositoryName(t), + } + + createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target") + createRepo(t, rs, "unrelated", additionalRepo.RelativePath, "rewritten-additional") + + return setupData{ + method: "/gitaly.ObjectPoolService/FetchIntoObjectPool", + request: &gitalypb.FetchIntoObjectPoolRequest{ + Origin: additionalRepo, + ObjectPool: &gitalypb.ObjectPool{ + Repository: targetRepo, + }, + }, + expectedErr: structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported"), + } + }, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -190,12 +376,8 @@ func TestStreamDirectorMutator(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) - if tc.repositoryExists { - require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) - } - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") return queue.Enqueue(ctx, event) @@ -219,66 +401,46 @@ func TestStreamDirectorMutator(t *testing.T) { protoregistry.GitalyProtoPreregistered, ) - frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ - Origin: &targetRepo, - ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo}, - }) - require.NoError(t, err) + setup := tc.setup(t, rs) + marshalledRequest, err := proto.Marshal(setup.request) require.NoError(t, err) - fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool" + streamParams, err := coordinator.StreamDirector(ctx, setup.method, &mockPeeker{marshalledRequest}) + require.Equal(t, setup.expectedErr, err) - peeker := &mockPeeker{frame} - streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) - if tc.error != nil { - require.Equal(t, tc.error, err) + if setup.expectedErr != nil { return } - require.NoError(t, err) - require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) - - mi, err := coordinator.registry.LookupMethod(fullMethod) - require.NoError(t, err) + // Assert that the stream parameters look as expecected. First, we verify + // that the request gets routed to the correct primary node. + require.Equal(t, primaryNode.Address, streamParams.Primary().Conn.Target()) - m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) + // Second, we verify that the request was rewritten so that its storage + // matches the primary node's storage. + mi, err := coordinator.registry.LookupMethod(setup.method) require.NoError(t, err) - - rewrittenTargetRepo, err := mi.TargetRepo(m) + rewrittenRequest, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) require.NoError(t, err) - require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") + testhelper.ProtoEqual(t, setup.expectedRewrittenRequest, rewrittenRequest) - // this call creates new events in the queue and simulates usual flow of the update operation + // Finalize the request and then wait for the resulting replication event to + // be queued. require.NoError(t, streamParams.RequestFinalizer()) - - // wait until event persisted (async operation) require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { return len(i.GetEnqueuedResult()) == 1 })) + // Assert that the replication event queue contains the expected event now. events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, events, 1) - - expectedEvent := datastore.ReplicationEvent{ - ID: 1, - State: datastore.JobStateInProgress, - Attempt: 2, - LockID: "praefect|praefect-internal-2|/path/to/hashed/storage", - CreatedAt: events[0].CreatedAt, - UpdatedAt: events[0].UpdatedAt, - Job: datastore.ReplicationJob{ - RepositoryID: 1, - Change: datastore.UpdateRepo, - VirtualStorage: conf.VirtualStorages[0].Name, - RelativePath: targetRepo.RelativePath, - TargetNodeStorage: secondaryNode.Storage, - SourceNodeStorage: primaryNode.Storage, - }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, - } - require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") + // Unset a bunch of data that is indeterministic. + events[0].ID = 0 + events[0].CreatedAt = time.Time{} + events[0].UpdatedAt = nil + require.Equal(t, setup.expectedEvent, events[0]) }) } } @@ -1249,6 +1411,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { } func TestRewrittenRepositoryMessage(t *testing.T) { + t.Parallel() + buildRequest := func(storageName, relativePath, additionalRelativePath string) *gitalypb.CreateObjectPoolRequest { return &gitalypb.CreateObjectPoolRequest{ ObjectPool: &gitalypb.ObjectPool{ @@ -1264,19 +1428,43 @@ func TestRewrittenRepositoryMessage(t *testing.T) { } } - originalRequest := buildRequest("original-storage", "original-relative-path", "original-additional-relative-path") + t.Run("successfully rewritten request", func(t *testing.T) { + originalRequest := buildRequest("original-storage", "original-relative-path", "original-additional-relative-path") - methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool") - require.NoError(t, err) + methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool") + require.NoError(t, err) - rewrittenMessageBytes, err := rewrittenRepositoryMessage(methodInfo, originalRequest, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path") - require.NoError(t, err) + rewrittenMessageBytes, err := rewrittenRepositoryMessage(methodInfo, originalRequest, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path") + require.NoError(t, err) + + var rewrittenMessage gitalypb.CreateObjectPoolRequest + require.NoError(t, proto.Unmarshal(rewrittenMessageBytes, &rewrittenMessage)) + + testhelper.ProtoEqual(t, buildRequest("original-storage", "original-relative-path", "original-additional-relative-path"), originalRequest) + testhelper.ProtoEqual(t, buildRequest("rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path"), &rewrittenMessage) + }) + + t.Run("rewriting differing storages fails", func(t *testing.T) { + request := &gitalypb.CreateObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{ + Repository: &gitalypb.Repository{ + StorageName: "a", + RelativePath: "a", + }, + }, + Origin: &gitalypb.Repository{ + StorageName: "b", + RelativePath: "b", + }, + } - var rewrittenMessage gitalypb.CreateObjectPoolRequest - require.NoError(t, proto.Unmarshal(rewrittenMessageBytes, &rewrittenMessage)) + methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool") + require.NoError(t, err) - testhelper.ProtoEqual(t, buildRequest("original-storage", "original-relative-path", "original-additional-relative-path"), originalRequest) - testhelper.ProtoEqual(t, buildRequest("rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path"), &rewrittenMessage) + rewrittenRequest, err := rewrittenRepositoryMessage(methodInfo, request, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path") + require.Equal(t, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported"), err) + require.Nil(t, rewrittenRequest) + }) } func TestStreamDirector_repo_creation(t *testing.T) { |